[Erp5-report] r37609 rafael - in /erp5/tags/version-5.4.6: ./ bt5/ products/ products/CMFAc...
nobody at svn.erp5.org
nobody at svn.erp5.org
Fri Aug 6 18:36:16 CEST 2010
Author: rafael
Date: Fri Aug 6 18:36:16 2010
New Revision: 37609
URL: http://svn.erp5.org?rev=37609&view=rev
Log:
Release version 5.4.6.
Tags correspont to revision 37390 and includes few changes on CMFActivity, ERP5Catalog and ERP5Type for improve unindex/uncatalog performance.
Added:
erp5/tags/version-5.4.6/
erp5/tags/version-5.4.6/bt5/ (props changed)
- copied from r37390, erp5/trunk/bt5/
erp5/tags/version-5.4.6/products/
- copied from r37390, erp5/trunk/products/
erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql
erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_processMessage.zsql
erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql
erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql
Removed:
erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql
erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql
erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql
erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql
erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql
erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql
erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql
Modified:
erp5/tags/version-5.4.6/bt5/EXTERNALS.TXT
erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLBase.py
erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLDict.py
erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLQueue.py
erp5/tags/version-5.4.6/products/CMFActivity/ActivityTool.py
erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql
erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql
erp5/tags/version-5.4.6/products/CMFActivity/tests/testCMFActivity.py
erp5/tags/version-5.4.6/products/ERP5Catalog/CatalogTool.py
erp5/tags/version-5.4.6/products/ERP5Type/CopySupport.py
erp5/tags/version-5.4.6/products/ERP5Type/tests/testCopySupport.py
Propchange: erp5/tags/version-5.4.6/bt5/
------------------------------------------------------------------------------
--- svn:externals (added)
+++ svn:externals Fri Aug 6 18:36:16 2010
@@ -0,0 +1,12 @@
+#
+# Used for maintenance of external resources in this svn bundle. Edit
+# this file as appropriate and then run the following command from within
+# the checkout directory where this file lives on your local machine:
+#
+# svn propset svn:externals -F ./EXTERNALS.TXT .
+#
+
+erp5_accounting_l10n_fr_m9 -r 335 svn://svn.adullact.net/svnroot/erp5-m9/trunk/bt5/erp5_accounting_l10n_fr_m9
+# developers should use
+# svn+ssh://(developername)@svn.adullact.net/svnroot/erp5-m9/trunk/bt5/erp5_accounting_l10n_fr_m9
+
Modified: erp5/tags/version-5.4.6/bt5/EXTERNALS.TXT
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/bt5/EXTERNALS.TXT?rev=37609&r1=37390&r2=37609&view=diff
==============================================================================
--- erp5/tags/version-5.4.6/bt5/EXTERNALS.TXT [utf8] (original)
+++ erp5/tags/version-5.4.6/bt5/EXTERNALS.TXT [utf8] Fri Aug 6 18:36:16 2010
@@ -6,7 +6,7 @@
# svn propset svn:externals -F ./EXTERNALS.TXT .
#
-erp5_accounting_l10n_fr_m9 svn://svn.adullact.net/svnroot/erp5-m9/trunk/bt5/erp5_accounting_l10n_fr_m9
+erp5_accounting_l10n_fr_m9 -r 335 svn://svn.adullact.net/svnroot/erp5-m9/trunk/bt5/erp5_accounting_l10n_fr_m9
# developers should use
# svn+ssh://(developername)@svn.adullact.net/svnroot/erp5-m9/trunk/bt5/erp5_accounting_l10n_fr_m9
Modified: erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLBase.py
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLBase.py?rev=37609&r1=37390&r2=37609&view=diff
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLBase.py [utf8] (original)
+++ erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLBase.py [utf8] Fri Aug 6 18:36:16 2010
@@ -27,14 +27,20 @@
##############################################################################
import sys
+import transaction
from zLOG import LOG, TRACE, INFO, WARNING, ERROR
from ZODB.POSException import ConflictError
from Products.CMFActivity.ActivityTool import (
MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED)
from Products.CMFActivity.ActiveObject import (
INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE)
+from Products.CMFActivity.ActivityRuntimeEnvironment import (
+ ActivityRuntimeEnvironment, getTransactionalVariable)
from Queue import VALIDATION_ERROR_DELAY
+# Stop electing more messages for processing if more than this number of
+# objects are impacted by elected messages.
+MAX_GROUPED_OBJECTS = 100
def sort_message_key(message):
# same sort key as in SQL{Dict,Queue}_readMessageList
@@ -129,6 +135,256 @@ class SQLBase:
LOG(self.__class__.__name__, severity, summary,
error=severity>INFO and sys.exc_info() or None)
+ def getReservedMessageList(self, activity_tool, date, processing_node,
+ limit=None, group_method_id=None):
+ """
+ Get and reserve a list of messages.
+ limit
+ Maximum number of messages to fetch.
+ This number is not garanted to be reached, because of:
+ - not enough messages being pending execution
+ - race condition (other nodes reserving the same messages at the same
+ time)
+ This number is guaranted not to be exceeded.
+ If None (or not given) no limit apply.
+ """
+ select = activity_tool.SQLBase_selectReservedMessageList
+ result = not group_method_id and select(table=self.sql_table, count=limit,
+ processing_node=processing_node)
+ if not result:
+ activity_tool.SQLBase_reserveMessageList(table=self.sql_table,
+ count=limit, processing_node=processing_node, to_date=date,
+ group_method_id=group_method_id)
+ result = select(table=self.sql_table,
+ processing_node=processing_node, count=limit)
+ return result
+
+ def makeMessageListAvailable(self, activity_tool, uid_list):
+ """
+ Put messages back in processing_node=0 .
+ """
+ if len(uid_list):
+ activity_tool.SQLBase_makeMessageListAvailable(table=self.sql_table,
+ uid=uid_list)
+
+ def getProcessableMessageList(self, activity_tool, processing_node):
+ """
+ Always true:
+ For each reserved message, delete redundant messages when it gets
+ reserved (definitely lost, but they are expandable since redundant).
+
+ - reserve a message
+ - set reserved message to processing=1 state
+ - if this message has a group_method_id:
+ - reserve a bunch of BUNDLE_MESSAGE_COUNT messages
+ - untill number of impacted objects goes over MAX_GROUPED_OBJECTS
+ - get one message from the reserved bunch (this messages will be
+ "needed")
+ - increase the number of impacted object
+ - set "needed" reserved messages to processing=1 state
+ - unreserve "unneeded" messages
+ - return still-reserved message list and a group_method_id
+
+ If any error happens in above described process, try to unreserve all
+ messages already reserved in that process.
+ If it fails, complain loudly that some messages might still be in an
+ unclean state.
+
+ Returned values:
+ 4-tuple:
+ - list of messages
+ - impacted object count
+ - group_method_id
+ - uid_to_duplicate_uid_list_dict
+ """
+ def getReservedMessageList(limit, group_method_id=None):
+ line_list = self.getReservedMessageList(activity_tool=activity_tool,
+ date=now_date,
+ processing_node=processing_node,
+ limit=limit,
+ group_method_id=group_method_id)
+ if len(line_list):
+ self._log(TRACE, 'Reserved messages: %r' % [x.uid for x in line_list])
+ return line_list
+ def getDuplicateMessageUidList(line):
+ uid_list = self.getDuplicateMessageUidList(activity_tool=activity_tool,
+ line=line, processing_node=processing_node)
+ if len(uid_list):
+ self._log(TRACE, 'Reserved duplicate messages: %r' % (uid_list, ))
+ return uid_list
+ BUNDLE_MESSAGE_COUNT = 100 # Arbitrary number
+ now_date = self.getNow(activity_tool)
+ message_list = []
+ count = 0
+ group_method_id = None
+ try:
+ result = getReservedMessageList(limit=1)
+ uid_to_duplicate_uid_list_dict = {}
+ if len(result) > 0:
+ line = result[0]
+ uid = line.uid
+ m = self.loadMessage(line.message, uid=uid, line=line)
+ message_list.append(m)
+ group_method_id = line.group_method_id
+ activity_tool.SQLBase_processMessage(table=self.sql_table, uid=[uid])
+ uid_to_duplicate_uid_list_dict.setdefault(uid, []) \
+ .extend(getDuplicateMessageUidList(line))
+ if group_method_id not in (None, '', '\0'):
+ # Count the number of objects to prevent too many objects.
+ count += len(m.getObjectList(activity_tool))
+ if count < MAX_GROUPED_OBJECTS:
+ # Retrieve objects which have the same group method.
+ result = getReservedMessageList(limit=BUNDLE_MESSAGE_COUNT,
+ group_method_id=group_method_id)
+ path_and_method_id_dict = {}
+ unreserve_uid_list = []
+ for line in result:
+ if line.uid == uid:
+ continue
+ # All fetched lines have the same group_method_id and
+ # processing_node.
+ # Their dates are lower-than or equal-to now_date.
+ # We read each line once so lines have distinct uids.
+ # So what remains to be filtered on are path, method_id and
+ # order_validation_text.
+ try:
+ key = line.path, line.method_id, line.order_validation_text
+ except AttributeError:
+ pass # message_queue does not have 'order_validation_text'
+ else:
+ original_uid = path_and_method_id_dict.get(key)
+ if original_uid is not None:
+ uid_to_duplicate_uid_list_dict.setdefault(original_uid, []) \
+ .append(line.uid)
+ continue
+ path_and_method_id_dict[key] = line.uid
+ uid_to_duplicate_uid_list_dict.setdefault(line.uid, []) \
+ .extend(getDuplicateMessageUidList(line))
+ if count < MAX_GROUPED_OBJECTS:
+ m = self.loadMessage(line.message, uid=line.uid, line=line)
+ count += len(m.getObjectList(activity_tool))
+ message_list.append(m)
+ else:
+ unreserve_uid_list.append(line.uid)
+ activity_tool.SQLBase_processMessage(table=self.sql_table,
+ uid=[m.uid for m in message_list])
+ # Unreserve extra messages as soon as possible.
+ self.makeMessageListAvailable(activity_tool=activity_tool,
+ uid_list=unreserve_uid_list)
+ return (message_list, count, group_method_id,
+ uid_to_duplicate_uid_list_dict)
+ except:
+ self._log(WARNING, 'Exception while reserving messages.')
+ if len(message_list):
+ to_free_uid_list = [m.uid for m in message_list]
+ try:
+ self.makeMessageListAvailable(activity_tool=activity_tool,
+ uid_list=to_free_uid_list)
+ except:
+ self._log(ERROR, 'Failed to free messages: %r' % to_free_uid_list)
+ else:
+ if len(to_free_uid_list):
+ self._log(TRACE, 'Freed messages %r' % to_free_uid_list)
+ else:
+ self._log(TRACE, '(no message was reserved)')
+ return [], 0, None, {}
+
+ # Queue semantic
+ def dequeueMessage(self, activity_tool, processing_node):
+ def makeMessageListAvailable(uid_list, uid_to_duplicate_uid_list_dict):
+ final_uid_list = []
+ for uid in uid_list:
+ final_uid_list.append(uid)
+ final_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
+ self.makeMessageListAvailable(activity_tool=activity_tool,
+ uid_list=final_uid_list)
+ message_list, count, group_method_id, uid_to_duplicate_uid_list_dict = \
+ self.getProcessableMessageList(activity_tool, processing_node)
+ if message_list:
+ # Remove group_id parameter from group_method_id
+ if group_method_id is not None:
+ group_method_id = group_method_id.split('\0')[0]
+ if group_method_id not in (None, ""):
+ method = activity_tool.invokeGroup
+ args = (group_method_id, message_list, self.__class__.__name__)
+ activity_runtime_environment = ActivityRuntimeEnvironment(None)
+ else:
+ method = activity_tool.invoke
+ message = message_list[0]
+ args = (message, )
+ activity_runtime_environment = ActivityRuntimeEnvironment(message)
+ # Commit right before executing messages.
+ # As MySQL transaction does not start exactly at the same time as ZODB
+ # transactions but a bit later, messages available might be called
+ # on objects which are not available - or available in an old
+ # version - to ZODB connector.
+ # So all connectors must be committed now that we have selected
+ # everything needed from MySQL to get a fresh view of ZODB objects.
+ transaction.commit()
+ tv = getTransactionalVariable(None)
+ tv['activity_runtime_environment'] = activity_runtime_environment
+ # Try to invoke
+ try:
+ method(*args)
+ except:
+ self._log(WARNING,
+ 'Exception raised when invoking messages (uid, path, method_id) %r'
+ % [(m.uid, m.object_path, m.method_id) for m in message_list])
+ try:
+ transaction.abort()
+ except:
+ # Unfortunately, database adapters may raise an exception against
+ # abort.
+ self._log(PANIC,
+ 'abort failed, thus some objects may be modified accidentally')
+ raise
+ # XXX Is it still useful to free messages now that this node is able
+ # to reselect them ?
+ to_free_uid_list = [x.uid for x in message_list]
+ try:
+ makeMessageListAvailable(to_free_uid_list,
+ uid_to_duplicate_uid_list_dict)
+ except:
+ self._log(ERROR, 'Failed to free messages: %r' % to_free_uid_list)
+ else:
+ self._log(TRACE, 'Freed messages %r' % to_free_uid_list)
+ # Abort if something failed.
+ if [m for m in message_list if m.getExecutionState() == MESSAGE_NOT_EXECUTED]:
+ endTransaction = transaction.abort
+ else:
+ endTransaction = transaction.commit
+ try:
+ endTransaction()
+ except:
+ self._log(WARNING,
+ 'Failed to end transaction for messages (uid, path, method_id) %r'
+ % [(m.uid, m.object_path, m.method_id) for m in message_list])
+ if endTransaction == transaction.abort:
+ self._log(PANIC, 'Failed to abort executed messages.'
+ ' Some objects may be modified accidentally.')
+ else:
+ try:
+ transaction.abort()
+ except:
+ self._log(PANIC, 'Failed to abort executed messages which also'
+ ' failed to commit. Some objects may be modified accidentally.')
+ raise
+ exc_info = sys.exc_info()
+ for m in message_list:
+ m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False)
+ try:
+ makeMessageListAvailable([x.uid for x in message_list],
+ uid_to_duplicate_uid_list_dict)
+ except:
+ self._log(ERROR, 'Failed to free remaining messages: %r'
+ % (message_list, ))
+ else:
+ self._log(TRACE, 'Freed messages %r' % (message_list, ))
+ self.finalizeMessageExecution(activity_tool, message_list,
+ uid_to_duplicate_uid_list_dict)
+ transaction.commit()
+ return not message_list
+
def finalizeMessageExecution(self, activity_tool, message_list,
uid_to_duplicate_uid_list_dict=None):
"""
Modified: erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLDict.py
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLDict.py?rev=37609&r1=37390&r2=37609&view=diff
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLDict.py [utf8] (original)
+++ erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLDict.py [utf8] Fri Aug 6 18:36:16 2010
@@ -29,16 +29,10 @@
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
from Queue import VALID, INVALID_PATH
from RAMDict import RAMDict
-from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from Products.CMFActivity.Errors import ActivityFlushError
-from ZODB.POSException import ConflictError
import sys
-from types import ClassType
#from time import time
from SQLBase import SQLBase, sort_message_key
-from Products.CMFActivity.ActivityRuntimeEnvironment import (
- ActivityRuntimeEnvironment, getTransactionalVariable)
-from zExceptions import ExceptionFormatter
import transaction
@@ -48,9 +42,6 @@ from zLOG import LOG, TRACE, WARNING, ER
MAX_VALIDATED_LIMIT = 1000
# Read up to this number of messages to validate.
READ_MESSAGE_LIMIT = 1000
-# Stop electing more messages for processing if more than this number of
-# objects are impacted by elected messages.
-MAX_GROUPED_OBJECTS = 100
MAX_MESSAGE_LIST_SIZE = 100
@@ -131,33 +122,6 @@ class SQLDict(RAMDict, SQLBase):
message_list = activity_buffer.getMessageList(self)
return [m for m in message_list if m.is_registered]
- def getReservedMessageList(self, activity_tool, date, processing_node, limit=None, group_method_id=None):
- """
- Get and reserve a list of messages.
- limit
- Maximum number of messages to fetch.
- This number is not garanted to be reached, because of:
- - not enough messages being pending execution
- - race condition (other nodes reserving the same messages at the same
- time)
- This number is guaranted not to be exceeded.
- If None (or not given) no limit apply.
- """
- result = not group_method_id and \
- activity_tool.SQLDict_selectReservedMessageList(
- processing_node=processing_node, count=limit)
- if not result:
- activity_tool.SQLDict_reserveMessageList(count=limit, processing_node=processing_node, to_date=date, group_method_id=group_method_id)
- result = activity_tool.SQLDict_selectReservedMessageList(processing_node=processing_node, count=limit)
- return result
-
- def makeMessageListAvailable(self, activity_tool, uid_list):
- """
- Put messages back in processing_node=0 .
- """
- if len(uid_list):
- activity_tool.SQLDict_makeMessageListAvailable(uid_list=uid_list)
-
def getDuplicateMessageUidList(self, activity_tool, line, processing_node):
"""
Reserve unreserved messages matching given line.
@@ -188,202 +152,7 @@ class SQLDict(RAMDict, SQLBase):
raise
return uid_list
- def getProcessableMessageList(self, activity_tool, processing_node):
- """
- Always true:
- For each reserved message, delete redundant messages when it gets
- reserved (definitely lost, but they are expandable since redundant).
-
- - reserve a message
- - set reserved message to processing=1 state
- - if this message has a group_method_id:
- - reserve a bunch of BUNDLE_MESSAGE_COUNT messages
- - untill number of impacted objects goes over MAX_GROUPED_OBJECTS
- - get one message from the reserved bunch (this messages will be
- "needed")
- - increase the number of impacted object
- - set "needed" reserved messages to processing=1 state
- - unreserve "unneeded" messages
- - return still-reserved message list and a group_method_id
-
- If any error happens in above described process, try to unreserve all
- messages already reserved in that process.
- If it fails, complain loudly that some messages might still be in an
- unclean state.
-
- Returned values:
- 4-tuple:
- - list of messages
- - impacted object count
- - group_method_id
- - uid_to_duplicate_uid_list_dict
- """
- def getReservedMessageList(limit, group_method_id=None):
- line_list = self.getReservedMessageList(activity_tool=activity_tool,
- date=now_date,
- processing_node=processing_node,
- limit=limit,
- group_method_id=group_method_id)
- if len(line_list):
- LOG('SQLDict', TRACE, 'Reserved messages: %r' % ([x.uid for x in line_list]))
- return line_list
- def getDuplicateMessageUidList(line):
- uid_list = self.getDuplicateMessageUidList(activity_tool=activity_tool,
- line=line, processing_node=processing_node)
- if len(uid_list):
- LOG('SQLDict', TRACE, 'Reserved duplicate messages: %r' % (uid_list, ))
- return uid_list
- def makeMessageListAvailable(uid_list):
- self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
- BUNDLE_MESSAGE_COUNT = 100 # Arbitrary number
- now_date = self.getNow(activity_tool)
- message_list = []
- count = 0
- group_method_id = None
- try:
- result = getReservedMessageList(limit=1)
- uid_to_duplicate_uid_list_dict = {}
- if len(result) > 0:
- line = result[0]
- uid = line.uid
- m = self.loadMessage(line.message, uid=uid, line=line)
- message_list.append(m)
- group_method_id = line.group_method_id
- activity_tool.SQLDict_processMessage(uid=[uid])
- uid_to_duplicate_uid_list_dict.setdefault(uid, []) \
- .extend(getDuplicateMessageUidList(line))
- if group_method_id not in (None, '', '\0'):
- # Count the number of objects to prevent too many objects.
- count += len(m.getObjectList(activity_tool))
- if count < MAX_GROUPED_OBJECTS:
- # Retrieve objects which have the same group method.
- result = getReservedMessageList(limit=BUNDLE_MESSAGE_COUNT, group_method_id=group_method_id)
- path_and_method_id_dict = {}
- unreserve_uid_list = []
- for line in result:
- if line.uid == uid:
- continue
- # All fetched lines have the same group_method_id and
- # processing_node.
- # Their dates are lower-than or equal-to now_date.
- # We read each line once so lines have distinct uids.
- # So what remains to be filtered on are path, method_id and
- # order_validation_text.
- key = (line.path, line.method_id, line.order_validation_text)
- original_uid = path_and_method_id_dict.get(key)
- if original_uid is not None:
- uid_to_duplicate_uid_list_dict.setdefault(original_uid, []).append(line.uid)
- continue
- path_and_method_id_dict[key] = line.uid
- uid_to_duplicate_uid_list_dict.setdefault(line.uid, []).extend(getDuplicateMessageUidList(line))
- if count < MAX_GROUPED_OBJECTS:
- m = self.loadMessage(line.message, uid=line.uid, line=line)
- count += len(m.getObjectList(activity_tool))
- message_list.append(m)
- else:
- unreserve_uid_list.append(line.uid)
- activity_tool.SQLDict_processMessage(uid=[m.uid for m in message_list])
- # Unreserve extra messages as soon as possible.
- makeMessageListAvailable(unreserve_uid_list)
- return message_list, count, group_method_id, uid_to_duplicate_uid_list_dict
- except:
- LOG('SQLDict', WARNING, 'Exception while reserving messages.', error=sys.exc_info())
- if len(message_list):
- to_free_uid_list = [m.uid for m in message_list]
- try:
- makeMessageListAvailable(to_free_uid_list)
- except:
- LOG('SQLDict', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
- else:
- if len(to_free_uid_list):
- LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
- else:
- LOG('SQLDict', TRACE, '(no message was reserved)')
- return [], 0, None, {}
-
- # Queue semantic
- def dequeueMessage(self, activity_tool, processing_node):
- def makeMessageListAvailable(uid_list, uid_to_duplicate_uid_list_dict):
- final_uid_list = []
- for uid in uid_list:
- final_uid_list.append(uid)
- final_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
- self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=final_uid_list)
- message_list, count, group_method_id, uid_to_duplicate_uid_list_dict = \
- self.getProcessableMessageList(activity_tool, processing_node)
- if message_list:
- # Remove group_id parameter from group_method_id
- if group_method_id is not None:
- group_method_id = group_method_id.split('\0')[0]
- if group_method_id not in (None, ""):
- method = activity_tool.invokeGroup
- args = (group_method_id, message_list)
- activity_runtime_environment = ActivityRuntimeEnvironment(None)
- else:
- method = activity_tool.invoke
- message = message_list[0]
- args = (message, )
- activity_runtime_environment = ActivityRuntimeEnvironment(message)
- # Commit right before executing messages.
- # As MySQL transaction does not start exactly at the same time as ZODB
- # transactions but a bit later, messages available might be called
- # on objects which are not available - or available in an old
- # version - to ZODB connector.
- # So all connectors must be committed now that we have selected
- # everything needed from MySQL to get a fresh view of ZODB objects.
- transaction.commit()
- tv = getTransactionalVariable(None)
- tv['activity_runtime_environment'] = activity_runtime_environment
- # Try to invoke
- try:
- method(*args)
- except:
- LOG('SQLDict', WARNING, 'Exception raised when invoking messages (uid, path, method_id) %r' % ([(m.uid, m.object_path, m.method_id) for m in message_list], ), error=sys.exc_info())
- try:
- transaction.abort()
- except:
- # Unfortunately, database adapters may raise an exception against abort.
- LOG('SQLDict', PANIC,
- 'abort failed, thus some objects may be modified accidentally')
- raise
- # XXX Is it still useful to free messages now that this node is able
- # to reselect them ?
- to_free_uid_list = [x.uid for x in message_list]
- try:
- makeMessageListAvailable(to_free_uid_list, uid_to_duplicate_uid_list_dict)
- except:
- LOG('SQLDict', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
- else:
- LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list))
- # Abort if something failed.
- if [m for m in message_list if m.getExecutionState() == MESSAGE_NOT_EXECUTED]:
- endTransaction = transaction.abort
- else:
- endTransaction = transaction.commit
- try:
- endTransaction()
- except:
- LOG('SQLDict', WARNING, 'Failed to end transaction for messages (uid, path, method_id) %r' % ([(m.uid, m.object_path, m.method_id) for m in message_list], ), error=sys.exc_info())
- if endTransaction == transaction.abort:
- LOG('SQLDict', PANIC, 'Failed to abort executed messages. Some objects may be modified accidentally.')
- else:
- try:
- transaction.abort()
- except:
- LOG('SQLDict', PANIC, 'Failed to abort executed messages which also failed to commit. Some objects may be modified accidentally.')
- raise
- exc_info = sys.exc_info()
- for m in message_list:
- m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info=exc_info, log=False)
- try:
- makeMessageListAvailable([x.uid for x in message_list], uid_to_duplicate_uid_list_dict)
- except:
- LOG('SQLDict', ERROR, 'Failed to free remaining messages: %r' % (message_list, ), error=sys.exc_info())
- else:
- LOG('SQLDict', TRACE, 'Freed messages %r' % (message_list, ))
- self.finalizeMessageExecution(activity_tool, message_list, uid_to_duplicate_uid_list_dict)
- transaction.commit()
- return not message_list
+ dequeueMessage = SQLBase.dequeueMessage
def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
hasMessage = getattr(activity_tool, 'SQLDict_hasMessage', None)
@@ -530,9 +299,10 @@ class SQLDict(RAMDict, SQLBase):
message_list.sort(key=sort_message_key)
deletable_uid_list += [m.uid for m in message_list[1:]]
message = message_list[0]
- distributable_uid_set.add(message.uid)
serialization_tag = message.activity_kw.get('serialization_tag')
- if serialization_tag is not None:
+ if serialization_tag is None:
+ distributable_uid_set.add(message.uid)
+ else:
serialization_tag_dict.setdefault(serialization_tag,
[]).append(message)
# Don't let through if there is the same serialization tag in the
@@ -542,15 +312,15 @@ class SQLDict(RAMDict, SQLBase):
# does not stop validating together. Because those messages should
# be processed together at once.
for message_list in serialization_tag_dict.itervalues():
- if len(message_list) == 1:
- continue
# Sort list of messages to validate the message with highest score
message_list.sort(key=sort_message_key)
+ distributable_uid_set.add(message_list[0].uid)
group_method_id = message_list[0].activity_kw.get('group_method_id')
+ if group_method_id is None:
+ continue
for message in message_list[1:]:
- if group_method_id is None or \
- group_method_id != message.activity_kw.get('group_method_id'):
- distributable_uid_set.remove(message.uid)
+ if group_method_id == message.activity_kw.get('group_method_id'):
+ distributable_uid_set.add(message.uid)
if deletable_uid_list:
activity_tool.SQLBase_delMessage(table=self.sql_table,
uid=deletable_uid_list)
Modified: erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLQueue.py
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLQueue.py?rev=37609&r1=37390&r2=37609&view=diff
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLQueue.py [utf8] (original)
+++ erp5/tags/version-5.4.6/products/CMFActivity/Activity/SQLQueue.py [utf8] Fri Aug 6 18:36:16 2010
@@ -29,15 +29,9 @@
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
from RAMQueue import RAMQueue
from Queue import VALID, INVALID_PATH
-from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from Products.CMFActivity.Errors import ActivityFlushError
from ZODB.POSException import ConflictError
-from types import ClassType
-import sys
-from time import time
from SQLBase import SQLBase, sort_message_key
-from Products.CMFActivity.ActivityRuntimeEnvironment import (
- ActivityRuntimeEnvironment, getTransactionalVariable)
from zExceptions import ExceptionFormatter
import transaction
@@ -48,17 +42,6 @@ from zLOG import LOG, WARNING, ERROR, IN
MAX_VALIDATED_LIMIT = 1000
# Read this many messages to validate.
READ_MESSAGE_LIMIT = 1000
-# Process this many messages in each dequeueMessage call.
-# Downside of setting to a "small" value: the cost of reserving a batch of
-# few messages increases relatively to the cost of executing activities,
-# making CMFActivity overhead significant.
-# Downside of setting to a "big" value: if there are many slow activities in
-# a multi-activity-node environment, multiple slow activities will be reserved
-# by a single node, making a suboptimal use of the parallelisation offered by
-# the cluster.
-# Before increasing this value, consider using SQLDict with group methods
-# first.
-MESSAGE_BUNDLE_SIZE = 1
MAX_MESSAGE_LIST_SIZE = 100
@@ -83,6 +66,8 @@ class SQLQueue(RAMQueue, SQLBase):
method_id_list = [m.method_id for m in registered_message_list]
priority_list = [m.activity_kw.get('priority', 1) for m in registered_message_list]
date_list = [m.activity_kw.get('at_date', None) for m in registered_message_list]
+ group_method_id_list = ['\0'.join([message.activity_kw.get('group_method_id', ''), message.activity_kw.get('group_id', '')])
+ for message in registered_message_list]
tag_list = [m.activity_kw.get('tag', '') for m in registered_message_list]
serialization_tag_list = [m.activity_kw.get('serialization_tag', '') for m in registered_message_list]
dumped_message_list = [self.dumpMessage(m) for m in registered_message_list]
@@ -92,6 +77,7 @@ class SQLQueue(RAMQueue, SQLBase):
method_id_list=method_id_list,
priority_list=priority_list,
message_list=dumped_message_list,
+ group_method_id_list = group_method_id_list,
date_list=date_list,
tag_list=tag_list,
processing_node_list=None,
@@ -110,162 +96,14 @@ class SQLQueue(RAMQueue, SQLBase):
# Nothing to do in SQLQueue.
pass
- def getReservedMessageList(self, activity_tool, date, processing_node, limit=None):
+ def getDuplicateMessageUidList(self, activity_tool, line, processing_node):
"""
- Get and reserve a list of messages.
- limit
- Maximum number of messages to fetch.
- This number is not garanted to be reached, because of:
- - not enough messages being pending execution
- - race condition (other nodes reserving the same messages at the same
- time)
- This number is guaranted not to be exceeded.
- If None (or not given) no limit apply.
+ Reserve unreserved messages matching given line.
+ Return their uids.
"""
- result = activity_tool.SQLQueue_selectReservedMessageList(processing_node=processing_node, count=limit)
- if len(result) == 0:
- activity_tool.SQLQueue_reserveMessageList(count=limit, processing_node=processing_node, to_date=date)
- result = activity_tool.SQLQueue_selectReservedMessageList(processing_node=processing_node, count=limit)
- return result
-
- def makeMessageListAvailable(self, activity_tool, uid_list):
- """
- Put messages back in processing_node=0 .
- """
- if len(uid_list):
- activity_tool.SQLQueue_makeMessageListAvailable(uid_list=uid_list)
-
- def getProcessableMessageList(self, activity_tool, processing_node):
- """
- Always true:
- For each reserved message, delete redundant messages when it gets
- reserved (definitely lost, but they are expandable since redundant).
-
- - reserve a message
- - set reserved message to processing=1 state
- - if this message has a group_method_id:
- - reserve a bunch of BUNDLE_MESSAGE_COUNT messages
- - untill number of impacted objects goes over MAX_GROUPED_OBJECTS
- - get one message from the reserved bunch (this messages will be
- "needed")
- - increase the number of impacted object
- - set "needed" reserved messages to processing=1 state
- - unreserve "unneeded" messages
- - return still-reserved message list
-
- If any error happens in above described process, try to unreserve all
- messages already reserved in that process.
- If it fails, complain loudly that some messages might still be in an
- unclean state.
-
- Returned values:
- list of messages
- """
- def getReservedMessageList(limit):
- line_list = self.getReservedMessageList(activity_tool=activity_tool,
- date=now_date,
- processing_node=processing_node,
- limit=limit)
- if len(line_list):
- LOG('SQLQueue', TRACE, 'Reserved messages: %r' % ([x.uid for x in line_list]))
- return line_list
- def makeMessageListAvailable(uid_list):
- self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
- now_date = self.getNow(activity_tool)
- message_list = []
- try:
- result = getReservedMessageList(limit=MESSAGE_BUNDLE_SIZE)
- for line in result:
- m = self.loadMessage(line.message, uid=line.uid, line=line)
- message_list.append(m)
- if len(message_list):
- activity_tool.SQLQueue_processMessage(uid=[m.uid for x in message_list])
- return message_list
- except:
- LOG('SQLQueue', WARNING, 'Exception while reserving messages.', error=sys.exc_info())
- if len(message_list):
- to_free_uid_list = [m.uid for m in message_list]
- try:
- makeMessageListAvailable(to_free_uid_list)
- except:
- LOG('SQLQueue', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
- else:
- if len(to_free_uid_list):
- LOG('SQLQueue', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
- else:
- LOG('SQLQueue', TRACE, '(no message was reserved)')
- return []
-
- def dequeueMessage(self, activity_tool, processing_node):
- def makeMessageListAvailable(uid_list):
- self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
- message_list = \
- self.getProcessableMessageList(activity_tool, processing_node)
- if message_list:
- processing_stop_time = time() + 30 # Stop processing after more than 10 seconds were spent
- processed_count = 0
- # Commit right before executing messages.
- # As MySQL transaction does not start exactly at the same time as ZODB
- # transactions but a bit later, messages available might be called
- # on objects which are not available - or available in an old
- # version - to ZODB connector.
- # So all connectors must be committed now that we have selected
- # everything needed from MySQL to get a fresh view of ZODB objects.
- transaction.commit()
- tv = getTransactionalVariable(None)
- for m in message_list:
- tv['activity_runtime_environment'] = ActivityRuntimeEnvironment(m)
- processed_count += 1
- # Try to invoke
- try:
- activity_tool.invoke(m)
- if m.getExecutionState() != MESSAGE_NOT_EXECUTED:
- # Commit so that if a message raises it doesn't causes previous
- # successfull messages to be rolled back. This commit might fail,
- # so it is protected the same way as activity execution by the
- # same "try" block.
- transaction.commit()
- else:
- # This message failed, abort.
- transaction.abort()
- except:
- value = m.uid, m.object_path, m.method_id
- LOG('SQLQueue', WARNING, 'Exception raised when invoking message (uid, path, method_id) %r' % (value, ), error=sys.exc_info())
- try:
- transaction.abort()
- except:
- # Unfortunately, database adapters may raise an exception against abort.
- LOG('SQLQueue', PANIC, 'abort failed, thus some objects may be modified accidentally')
- raise
- # We must make sure that the message is not set as executed.
- # It is possible that the message is executed but the commit
- # of the transaction fails
- m.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
- # XXX Is it still useful to free message now that this node is able
- # to reselect it ?
- try:
- makeMessageListAvailable([m.uid])
- except:
- LOG('SQLQueue', ERROR, 'Failed to free message: %r' % (value, ), error=sys.exc_info())
- else:
- LOG('SQLQueue', TRACE, 'Freed message %r' % (value, ))
- if time() > processing_stop_time:
- LOG('SQLQueue', TRACE, 'Stop processing message batch because processing delay exceeded')
- break
- # Release all unprocessed messages
- to_free_uid_list = [m.uid for m in message_list[processed_count:]]
- if to_free_uid_list:
- try:
- makeMessageListAvailable(to_free_uid_list)
- except:
- LOG('SQLQueue', ERROR, 'Failed to free remaining messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
- else:
- LOG('SQLQueue', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
- self.finalizeMessageExecution(activity_tool,
- message_list[:processed_count])
- transaction.commit()
- return not message_list
+ return ()
+ dequeueMessage = SQLBase.dequeueMessage
def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
hasMessage = getattr(activity_tool, 'SQLQueue_hasMessage', None)
@@ -416,6 +254,12 @@ class SQLQueue(RAMQueue, SQLBase):
# Sort list of messages to validate the message with highest score
message_list.sort(key=sort_message_key)
distributable_uid_set.add(message_list[0].uid)
+ group_method_id = message_list[0].activity_kw.get('group_method_id')
+ if group_method_id is None:
+ continue
+ for message in message_list[1:]:
+ if group_method_id == message.activity_kw.get('group_method_id'):
+ distributable_uid_set.add(message.uid)
distributable_count = len(distributable_uid_set)
if distributable_count:
activity_tool.SQLBase_assignMessage(table=self.sql_table,
Modified: erp5/tags/version-5.4.6/products/CMFActivity/ActivityTool.py
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/products/CMFActivity/ActivityTool.py?rev=37609&r1=37390&r2=37609&view=diff
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/ActivityTool.py [utf8] (original)
+++ erp5/tags/version-5.4.6/products/CMFActivity/ActivityTool.py [utf8] Fri Aug 6 18:36:16 2010
@@ -1172,16 +1172,18 @@ class ActivityTool (Folder, UniqueObject
for held in my_self.REQUEST._held:
self.REQUEST._hold(held)
- def invokeGroup(self, method_id, message_list):
+ def invokeGroup(self, method_id, message_list, activity):
if self.activity_tracking:
- activity_tracking_logger.info('invoking group messages: method_id=%s, paths=%s' % (method_id, ['/'.join(m.object_path) for m in message_list]))
+ activity_tracking_logger.info(
+ 'invoking group messages: method_id=%s, paths=%s'
+ % (method_id, ['/'.join(m.object_path) for m in message_list]))
# Invoke a group method.
- object_list = []
expanded_object_list = []
new_message_list = []
- path_dict = {}
- # Filter the list of messages. If an object is not available, mark its message as non-executable.
- # In addition, expand an object if necessary, and make sure that no duplication happens.
+ path_set = set()
+ # Filter the list of messages. If an object is not available, mark its
+ # message as non-executable. In addition, expand an object if necessary,
+ # and make sure that no duplication happens.
for m in message_list:
# alternate method is used to segregate objects which cannot be grouped.
alternate_method_id = m.activity_kw.get('alternate_method_id')
@@ -1195,42 +1197,29 @@ class ActivityTool (Folder, UniqueObject
m.setExecutionState(MESSAGE_NOT_EXECUTABLE, context=self)
continue
try:
- i = len(new_message_list) # This is an index of this message in new_message_list.
- if m.hasExpandMethod():
- for subobj in m.getObjectList(self):
+ if activity == 'SQLQueue':
+ expanded_object_list.append((obj, m.args, m.kw))
+ else:
+ if m.hasExpandMethod():
+ subobject_list = m.getObjectList(self)
+ else:
+ subobject_list = (obj,)
+ for subobj in subobject_list:
path = subobj.getPath()
- if path not in path_dict:
- path_dict[path] = i
+ if path not in path_set:
+ path_set.add(path)
if alternate_method_id is not None \
and hasattr(aq_base(subobj), alternate_method_id):
- # if this object is alternated, generate a new single active object.
+ # if this object is alternated,
+ # generate a new single active object
activity_kw = m.activity_kw.copy()
- if 'group_method_id' in activity_kw:
- del activity_kw['group_method_id']
- if 'group_id' in activity_kw:
- del activity_kw['group_id']
- active_obj = subobj.activate(**activity_kw)
+ activity_kw.pop('group_method_id', None)
+ activity_kw.pop('group_id', None)
+ active_obj = subobj.activate(activity=activity, **activity_kw)
getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
else:
- expanded_object_list.append(subobj)
- else:
- path = obj.getPath()
- if path not in path_dict:
- path_dict[path] = i
- if alternate_method_id is not None \
- and hasattr(aq_base(obj), alternate_method_id):
- # if this object is alternated, generate a new single active object.
- activity_kw = m.activity_kw.copy()
- if 'group_method_id' in activity_kw:
- del activity_kw['group_method_id']
- if 'group_id' in activity_kw:
- del activity_kw['group_id']
- active_obj = obj.activate(**activity_kw)
- getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
- else:
- expanded_object_list.append(obj)
- object_list.append(obj)
- new_message_list.append(m)
+ expanded_object_list.append((subobj, m.args, m.kw))
+ new_message_list.append((m, obj))
except:
m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
@@ -1238,39 +1227,36 @@ class ActivityTool (Folder, UniqueObject
if len(expanded_object_list) > 0:
method = self.unrestrictedTraverse(method_id)
# FIXME: how to apply security here?
- # NOTE: expanded_object_list must be set to failed objects by the callee.
- # If it fully succeeds, expanded_object_list must be empty when returning.
- result = method(expanded_object_list, **m.kw)
+ # NOTE: expanded_object_list must be set to failed objects by the
+ # callee. If it fully succeeds, expanded_object_list must be
+ # empty when returning.
+ result = method(expanded_object_list)
else:
result = None
except:
# In this case, the group method completely failed.
exc_info = sys.exc_info()
- for m in new_message_list:
- m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info=exc_info, log=False)
+ for m, obj in new_message_list:
+ m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False)
LOG('WARNING ActivityTool', 0,
'Could not call method %s on objects %s' %
- (method_id, expanded_object_list), error=exc_info)
+ (method_id, [x[0] for x in expanded_object_list]), error=exc_info)
error_log = getattr(self, 'error_log', None)
if error_log is not None:
error_log.raising(exc_info)
else:
- # Obtain all indices of failed messages. Note that this can be a partial failure.
- failed_message_dict = {}
- for obj in expanded_object_list:
- path = obj.getPath()
- i = path_dict[path]
- failed_message_dict[i] = None
-
+ # Obtain all indices of failed messages.
+ # Note that this can be a partial failure.
+ failed_message_set = set(id(x[2]) for x in expanded_object_list)
# Only for succeeded messages, an activity process is invoked (if any).
- for i in xrange(len(object_list)):
- object = object_list[i]
- m = new_message_list[i]
- if i in failed_message_dict:
+ for m, obj in new_message_list:
+ # We use id of kw dict (persistent object) to know if there is a
+ # failed 3-tuple corresponding to Message m.
+ if id(m.kw) in failed_message_set:
m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
else:
try:
- m.activateResult(self, result, object)
+ m.activateResult(self, result, obj)
except:
m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
else:
Added: erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql?rev=37609&view=auto
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql (added)
+++ erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql [utf8] Fri Aug 6 18:36:16 2010
@@ -0,0 +1,20 @@
+<dtml-comment>
+title:
+connection_id:cmf_activity_sql_connection
+max_rows:0
+max_cache:0
+cache_time:0
+class_name:
+class_file:
+</dtml-comment>
+<params>table
+uid</params>
+UPDATE
+ <dtml-var table>
+SET
+ processing_node=0,
+ processing=0
+WHERE
+ <dtml-sqltest uid type="int" multiple>
+<dtml-var sql_delimiter>
+COMMIT
Added: erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_processMessage.zsql
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_processMessage.zsql?rev=37609&view=auto
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_processMessage.zsql (added)
+++ erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_processMessage.zsql [utf8] Fri Aug 6 18:36:16 2010
@@ -0,0 +1,20 @@
+<dtml-comment>
+title:
+connection_id:cmf_activity_sql_connection
+max_rows:1
+max_cache:0
+cache_time:0
+class_name:
+class_file:
+</dtml-comment>
+<params>table
+uid</params>
+UPDATE
+ <dtml-var table>
+SET
+ processing_date = UTC_TIMESTAMP(),
+ processing = 1
+WHERE
+ <dtml-sqltest uid type="int" multiple>
+<dtml-var sql_delimiter>
+COMMIT
Added: erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql?rev=37609&view=auto
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql (added)
+++ erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql [utf8] Fri Aug 6 18:36:16 2010
@@ -0,0 +1,39 @@
+<dtml-comment>
+title:
+connection_id:cmf_activity_sql_connection
+max_rows:0
+max_cache:0
+cache_time:0
+class_name:
+class_file:
+</dtml-comment>
+<params>table
+processing_node
+to_date
+count
+group_method_id
+</params>
+UPDATE
+ <dtml-var table>
+SET
+ processing_node=<dtml-sqlvar processing_node type="int">
+WHERE
+ processing_node=0
+ AND date <= <dtml-sqlvar to_date type="datetime">
+ <dtml-if expr="group_method_id is not None">
+ AND group_method_id = <dtml-sqlvar group_method_id type="string">
+ </dtml-if>
+ORDER BY
+<dtml-comment>
+ Explanation of the order by:
+ - priority must be respected (it is a feature)
+ - when multiple nodes simultaneously try to fetch activities, they should not
+ be given the same set of lines as it would cause all minus one to wait for
+ a write lock (and be ultimately aborted), effectively serializing their
+ action (so breaking paralellism).
+ So we must force MySQL to update lines in a random order.
+</dtml-comment>
+ priority, RAND()
+LIMIT <dtml-sqlvar count type="int">
+<dtml-var sql_delimiter>
+COMMIT
Added: erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql?rev=37609&view=auto
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql (added)
+++ erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql [utf8] Fri Aug 6 18:36:16 2010
@@ -0,0 +1,21 @@
+<dtml-comment>
+title:
+connection_id:cmf_activity_sql_connection
+max_rows:0
+max_cache:0
+cache_time:0
+class_name:
+class_file:
+</dtml-comment>
+<params>table
+processing_node
+count</params>
+SELECT
+ *
+FROM
+ <dtml-var table>
+WHERE
+ processing_node = <dtml-sqlvar processing_node type="int">
+<dtml-if expr="count is not None">
+ LIMIT <dtml-sqlvar count type="int">
+</dtml-if>
Removed: erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql?rev=37390&view=auto
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql [utf8] (original)
+++ erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql (removed)
@@ -1,21 +0,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:0
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>uid_list</params>
-UPDATE
- message
-SET
- processing_node=0,
- processing=0
-WHERE
- uid IN (
- <dtml-in prefix="uid" expr="uid_list"><dtml-sqlvar uid_item type="int"><dtml-if sequence-end><dtml-else>, </dtml-if></dtml-in>
- )
-<dtml-var sql_delimiter>
-COMMIT
Removed: erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql?rev=37390&view=auto
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql [utf8] (original)
+++ erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql (removed)
@@ -1,20 +0,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:1
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>uid</params>
-UPDATE message
-SET
- processing_date = UTC_TIMESTAMP(),
- processing = 1
-WHERE
- uid IN (
-<dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
- )
-<dtml-var sql_delimiter>
-COMMIT
Removed: erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql?rev=37390&view=auto
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql [utf8] (original)
+++ erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql (removed)
@@ -1,36 +0,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:0
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>processing_node
-to_date
-count
-group_method_id
-</params>
-UPDATE
- message
-SET
- processing_node=<dtml-sqlvar processing_node type="int">
-WHERE
- processing_node=0
- AND date <= <dtml-sqlvar to_date type="datetime">
- <dtml-if expr="group_method_id is not None"> AND group_method_id = <dtml-sqlvar group_method_id type="string"> </dtml-if>
-ORDER BY
-<dtml-comment>
- Explanation of the order by:
- - priority must be respected (it is a feature)
- - when multiple nodes simultaneously try to fetch activities, they should not
- be given the same set of lines as it would cause all minus one to wait for
- a write lock (and be ultimately aborted), effectively serializing their
- action (so breaking paralellism).
- So we must force MySQL to update lines in a random order.
-</dtml-comment>
- priority, RAND()
-LIMIT <dtml-sqlvar count type="int">
-<dtml-var sql_delimiter>
-COMMIT
Removed: erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql?rev=37390&view=auto
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql [utf8] (original)
+++ erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql (removed)
@@ -1,20 +0,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:0
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>processing_node
-count</params>
-SELECT
- *
-FROM
- message
-WHERE
- processing_node = <dtml-sqlvar processing_node type="int">
-<dtml-if expr="count is not None">
- LIMIT <dtml-sqlvar count type="int">
-</dtml-if>
Modified: erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql?rev=37609&r1=37390&r2=37609&view=diff
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql [utf8] (original)
+++ erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql [utf8] Fri Aug 6 18:36:16 2010
@@ -18,6 +18,7 @@ CREATE TABLE `message_queue` (
`processing` TINYINT NOT NULL DEFAULT 0,
`processing_date` DATETIME,
`priority` TINYINT NOT NULL DEFAULT 0,
+ `group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
`tag` VARCHAR(255) NOT NULL,
`serialization_tag` VARCHAR(255) NOT NULL,
`retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
Removed: erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql?rev=37390&view=auto
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql [utf8] (original)
+++ erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql (removed)
@@ -1,21 +0,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:0
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>uid_list</params>
-UPDATE
- message_queue
-SET
- processing_node=0,
- processing=0
-WHERE
- uid IN (
- <dtml-in prefix="uid" expr="uid_list"><dtml-sqlvar uid_item type="int"><dtml-if sequence-end><dtml-else>, </dtml-if></dtml-in>
- )
-<dtml-var sql_delimiter>
-COMMIT
Removed: erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql?rev=37390&view=auto
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql [utf8] (original)
+++ erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql (removed)
@@ -1,34 +0,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:0
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>processing_node
-to_date
-count
-</params>
-UPDATE
- message_queue
-SET
- processing_node=<dtml-sqlvar processing_node type="int">
-WHERE
- processing_node=0
- AND date <= <dtml-sqlvar to_date type="datetime">
-ORDER BY
-<dtml-comment>
- Explanation of the order by:
- - priority must be respected (it is a feature)
- - when multiple nodes simultaneously try to fetch activities, they should not
- be given the same set of lines as it would cause all minus one to wait for
- a write lock (and be ultimately aborted), effectively serializing their
- action (so breaking paralellism).
- So we must force MySQL to update lines in a random order.
-</dtml-comment>
- priority, RAND()
-LIMIT <dtml-sqlvar count type="int">
-<dtml-var sql_delimiter>
-COMMIT
Removed: erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql?rev=37390&view=auto
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql [utf8] (original)
+++ erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql (removed)
@@ -1,20 +0,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:0
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>processing_node
-count</params>
-SELECT
- *
-FROM
- message_queue
-WHERE
- processing_node = <dtml-sqlvar processing_node type="int">
-<dtml-if expr="count is not None">
- LIMIT <dtml-sqlvar count type="int">
-</dtml-if>
Modified: erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql?rev=37609&r1=37390&r2=37609&view=diff
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql [utf8] (original)
+++ erp5/tags/version-5.4.6/products/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql [utf8] Fri Aug 6 18:36:16 2010
@@ -15,11 +15,12 @@ message_list
priority_list
processing_node_list
date_list
+group_method_id_list
tag_list
serialization_tag_list
</params>
INSERT INTO message_queue
-(uid, path, active_process_uid, date, method_id, processing_node, processing, priority, tag, serialization_tag, message)
+(uid, path, active_process_uid, date, method_id, processing_node, processing, priority, group_method_id, tag, serialization_tag, message)
VALUES
<dtml-in prefix="loop" expr="_.range(_.len(path_list))">
<dtml-if sequence-start><dtml-else>,</dtml-if>
@@ -32,6 +33,7 @@ VALUES
<dtml-if expr="processing_node_list is not None"><dtml-sqlvar expr="processing_node_list[loop_item]" type="int"><dtml-else>-1</dtml-if>,
0,
<dtml-sqlvar expr="priority_list[loop_item]" type="int">,
+ <dtml-sqlvar expr="group_method_id_list[loop_item]" type="string">,
<dtml-sqlvar expr="tag_list[loop_item]" type="string">,
<dtml-sqlvar expr="serialization_tag_list[loop_item]" type="string">,
<dtml-sqlvar expr="message_list[loop_item]" type="string">
Modified: erp5/tags/version-5.4.6/products/CMFActivity/tests/testCMFActivity.py
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/products/CMFActivity/tests/testCMFActivity.py?rev=37609&r1=37390&r2=37609&view=diff
==============================================================================
--- erp5/tags/version-5.4.6/products/CMFActivity/tests/testCMFActivity.py [utf8] (original)
+++ erp5/tags/version-5.4.6/products/CMFActivity/tests/testCMFActivity.py [utf8] Fri Aug 6 18:36:16 2010
@@ -430,7 +430,7 @@ class TestCMFActivity(ERP5TypeTestCase,
def TryActiveProcessInsideActivity(self, activity):
"""
Try two levels with active_process, we create one first
- activity with an acitive process, then this new activity
+ activity with an active process, then this new activity
uses another active process
"""
portal = self.getPortal()
@@ -1872,26 +1872,26 @@ class TestCMFActivity(ERP5TypeTestCase,
getattr(organisation, 'uid')
- def test_80_CallWithGroupIdParamater(self, quiet=0, run=run_all_test):
- """
- Test that group_id parameter is used to separate execution of the same method
- """
+ def callWithGroupIdParamater(self, activity, quiet, run):
if not run: return
if not quiet:
- message = '\nTest Activity with group_id parameter'
+ message = '\nTest Activity with group_id parameter (%s)' % activity
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
portal = self.getPortal()
organisation = portal.organisation._getOb(self.company_id)
# Defined a group method
- def setFoobar(self, object_list, number=1):
- for obj in object_list:
+ foobar_list = []
+ def setFoobar(self, object_list):
+ foobar_list.append(len(object_list))
+ for obj, args, kw in object_list:
+ number = kw.get('number', 1)
if getattr(obj,'foobar', None) is not None:
obj.foobar = obj.foobar + number
else:
obj.foobar = number
- object_list[:] = []
+ del object_list[:]
from Products.ERP5Type.Document.Folder import Folder
Folder.setFoobar = setFoobar
@@ -1904,46 +1904,65 @@ class TestCMFActivity(ERP5TypeTestCase,
# Test group_method_id is working without group_id
for x in xrange(5):
- organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar").reindexObject(number=1)
+ organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar").reindexObject(number=1)
transaction.commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),5)
portal.portal_activities.distribute()
portal.portal_activities.tic()
- self.assertEquals(1, organisation.getFoobar())
+ expected = dict(SQLDict=1, SQLQueue=5)[activity]
+ self.assertEquals(expected, organisation.getFoobar())
# Test group_method_id is working with one group_id defined
for x in xrange(5):
- organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
+ organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
transaction.commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),5)
portal.portal_activities.distribute()
portal.portal_activities.tic()
- self.assertEquals(2, organisation.getFoobar())
+ self.assertEquals(expected * 2, organisation.getFoobar())
+
+ self.assertEquals([expected, expected], foobar_list)
+ del foobar_list[:]
# Test group_method_id is working with many group_id defined
for x in xrange(5):
- organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
+ organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
transaction.commit()
- organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="2").reindexObject(number=3)
+ organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="2").reindexObject(number=3)
transaction.commit()
- organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
+ organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
transaction.commit()
- organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="3").reindexObject(number=5)
+ organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="3").reindexObject(number=5)
transaction.commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),20)
portal.portal_activities.distribute()
portal.portal_activities.tic()
- self.assertEquals(11, organisation.getFoobar())
+ self.assertEquals(dict(SQLDict=11, SQLQueue=60)[activity],
+ organisation.getFoobar())
+ self.assertEquals(dict(SQLDict=[1, 1, 1], SQLQueue=[5, 5, 10])[activity],
+ sorted(foobar_list))
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list), 0)
-
+
+ def test_80a_CallWithGroupIdParamaterSQLDict(self, quiet=0, run=run_all_test):
+ """
+ Test that group_id parameter is used to separate execution of the same method
+ """
+ self.callWithGroupIdParamater('SQLDict', quiet=quiet, run=run)
+
+ def test_80b_CallWithGroupIdParamaterSQLQueue(self, quiet=0,
+ run=run_all_test):
+ """
+ Test that group_id parameter is used to separate execution of the same method
+ """
+ self.callWithGroupIdParamater('SQLQueue', quiet=quiet, run=run)
def test_81_ActivateKwForWorkflowTransition(self, quiet=0, run=run_all_test):
"""
@@ -2009,7 +2028,7 @@ class TestCMFActivity(ERP5TypeTestCase,
transaction.commit()
self.tic()
activity_tool = self.getActivityTool()
- def modifySQLAndFail(self, object_list, **kw):
+ def modifySQLAndFail(self, object_list):
# Only create the dummy activity if none is present: we would just
# generate missleading errors (duplicate uid).
if activity_tool.countMessage(method_id='dummy_activity') == 0:
@@ -2437,7 +2456,7 @@ class TestCMFActivity(ERP5TypeTestCase,
transaction.commit()
self.tic()
activity_tool = self.getActivityTool()
- def modifySQL(self, object_list, *arg, **kw):
+ def modifySQL(self, object_list):
# Only create the dummy activity if none is present: we would just
# generate missleading errors (duplicate uid).
if activity_tool.countMessage(method_id='dummy_activity') == 0:
@@ -3147,7 +3166,7 @@ class TestCMFActivity(ERP5TypeTestCase,
self.assertEqual(len(message_list), 1)
message = message_list[0]
portal.organisation_module._delOb(organisation.id)
- activity_tool.invokeGroup('getTitle', [message])
+ activity_tool.invokeGroup('getTitle', [message], 'SQLDict')
checkMessage(message, KeyError)
activity_tool.manageCancel(message.object_path, message.method_id)
# 2: activity method does not exist when activity is executed
@@ -3156,7 +3175,8 @@ class TestCMFActivity(ERP5TypeTestCase,
message_list = activity_tool.getMessageList()
self.assertEqual(len(message_list), 1)
message = message_list[0]
- activity_tool.invokeGroup('this_method_does_not_exist', [message])
+ activity_tool.invokeGroup('this_method_does_not_exist',
+ [message], 'SQLDict')
checkMessage(message, KeyError)
activity_tool.manageCancel(message.object_path, message.method_id)
@@ -3738,7 +3758,6 @@ class TestCMFActivity(ERP5TypeTestCase,
LOG('Testing... ',0,message)
self.TryNotificationSavedOnEventLogWhenSiteErrorLoggerRaises('SQLDict')
- @expectedFailure
def test_123_userNotificationSavedOnEventLogWhenSiteErrorLoggerRaisesWithSQLQueue(self, quiet=0, run=run_all_test):
if not run: return
if not quiet:
@@ -3779,7 +3798,49 @@ class TestCMFActivity(ERP5TypeTestCase,
activity_tool.manageClearActivities(keep=0)
finally:
SQLQueue.MAX_MESSAGE_LIST_SIZE = old_MAX_MESSAGE_LIST_SIZE
-
+
+ def test_125_CheckDistributeWithSerializationTagAndGroupMethodId(self):
+ activity_tool = self.portal.portal_activities
+ obj1 = activity_tool.newActiveProcess()
+ obj2 = activity_tool.newActiveProcess()
+ transaction.commit()
+ self.tic()
+ group_method_call_list = []
+ def doSomething(self, message_list):
+ group_method_call_list.append(sorted((obj.getPath(), args, kw)
+ for obj, args, kw in message_list))
+ del message_list[:]
+ activity_tool.__class__.doSomething = doSomething
+ try:
+ for activity in 'SQLDict', 'SQLQueue':
+ activity_kw = dict(activity=activity, serialization_tag=self.id(),
+ group_method_id='portal_activities/doSomething')
+ obj1.activate(**activity_kw).dummy(1, x=None)
+ obj2.activate(**activity_kw).dummy(2, y=None)
+ transaction.commit()
+ activity_tool.distribute()
+ activity_tool.tic()
+ self.assertEqual(group_method_call_list.pop(),
+ sorted([(obj1.getPath(), (1,), dict(x=None)),
+ (obj2.getPath(), (2,), dict(y=None))]))
+ self.assertFalse(group_method_call_list)
+ self.assertFalse(activity_tool.getMessageList())
+ obj1.activate(priority=2, **activity_kw).dummy1(1, x=None)
+ obj1.activate(priority=1, **activity_kw).dummy2(2, y=None)
+ message1 = obj1.getPath(), (1,), dict(x=None)
+ message2 = obj1.getPath(), (2,), dict(y=None)
+ transaction.commit()
+ activity_tool.distribute()
+ self.assertEqual(len(activity_tool.getMessageList()), 2)
+ activity_tool.tic()
+ self.assertEqual(group_method_call_list.pop(),
+ dict(SQLDict=[message2],
+ SQLQueue=[message1, message2])[activity])
+ self.assertFalse(group_method_call_list)
+ self.assertFalse(activity_tool.getMessageList())
+ finally:
+ del activity_tool.__class__.doSomething
+
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestCMFActivity))
Modified: erp5/tags/version-5.4.6/products/ERP5Catalog/CatalogTool.py
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/products/ERP5Catalog/CatalogTool.py?rev=37609&r1=37390&r2=37609&view=diff
==============================================================================
--- erp5/tags/version-5.4.6/products/ERP5Catalog/CatalogTool.py [utf8] (original)
+++ erp5/tags/version-5.4.6/products/ERP5Catalog/CatalogTool.py [utf8] Fri Aug 6 18:36:16 2010
@@ -759,6 +759,23 @@ class CatalogTool (UniqueObject, ZCatalo
self.catalog_object(object, url, idxs=idxs, sql_catalog_id=sql_catalog_id,**kw)
+ def catalogObjectList(self, object_list, *args, **kw):
+ """Catalog a list of objects"""
+ if type(object_list[0]) is tuple:
+ tmp_object_list = [x[0] for x in object_list]
+ ZCatalog.catalogObjectList(self, tmp_object_list, **x[2])
+ # keep failed objects in 'object_list'
+ object_list[:] = [x for x in object_list if x[0] in tmp_object_list]
+ else:
+ ZCatalog.catalogObjectList(self, object_list, *args, **kw)
+
+ security.declarePrivate('uncatalogObjectList')
+ def uncatalogObjectList(self, message_list):
+ """Uncatalog a list of objects"""
+ for obj, args, kw in message_list:
+ self.unindexObject(*args, **kw)
+ del message_list[:]
+
security.declarePrivate('unindexObject')
def unindexObject(self, object=None, path=None, uid=None,sql_catalog_id=None):
"""
Modified: erp5/tags/version-5.4.6/products/ERP5Type/CopySupport.py
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/products/ERP5Type/CopySupport.py?rev=37609&r1=37390&r2=37609&view=diff
==============================================================================
--- erp5/tags/version-5.4.6/products/ERP5Type/CopySupport.py [utf8] (original)
+++ erp5/tags/version-5.4.6/products/ERP5Type/CopySupport.py [utf8] Fri Aug 6 18:36:16 2010
@@ -375,14 +375,12 @@ class CopyContainer:
catalog.beforeUnindexObject(None,path=path,uid=uid)
# Then start activity in order to remove lines in catalog,
# sql wich generate locks
- if path is None:
- path = self.getPath()
# - serialization_tag is used in order to prevent unindexation to
# happen before/in parallel with reindexations of the same object.
catalog.activate(activity='SQLQueue',
tag='%s' % uid,
- serialization_tag=self.getRootDocumentPath()).unindexObject(None,
- path=path,uid=uid)
+ group_method_id='portal_catalog/uncatalogObjectList',
+ serialization_tag=self.getRootDocumentPath()).unindexObject(uid=uid)
security.declareProtected(Permissions.ModifyPortalContent, 'moveObject')
def moveObject(self, idxs=None):
Modified: erp5/tags/version-5.4.6/products/ERP5Type/tests/testCopySupport.py
URL: http://svn.erp5.org/erp5/tags/version-5.4.6/products/ERP5Type/tests/testCopySupport.py?rev=37609&r1=37390&r2=37609&view=diff
==============================================================================
--- erp5/tags/version-5.4.6/products/ERP5Type/tests/testCopySupport.py [utf8] (original)
+++ erp5/tags/version-5.4.6/products/ERP5Type/tests/testCopySupport.py [utf8] Fri Aug 6 18:36:16 2010
@@ -31,6 +31,7 @@ import transaction
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
#from AccessControl.SecurityManagement import newSecurityManager
+from Products.CMFActivity.ActivityTool import ActivityTool
from Products.CMFActivity.Errors import ActivityPendingError
class TestCopySupport(ERP5TypeTestCase):
@@ -100,13 +101,49 @@ class TestCopySupport(ERP5TypeTestCase):
# Currently, the test passes only because ActivityTool.distribute always
# iterates on queues in the same order: SQLQueue before SQLDict.
# If Python returned dictionary values in a different order,
- # reindex activities fail with the following error:
+ # reindex activities would fail with the following error:
# uid of <Products.ERP5Catalog.CatalogTool.IndexableObjectWrapper for
# /erp5/person_module/1/old_address> is 599L and is already assigned
# to deleted in catalog !!! This can be fatal.
# This test would also fail if SQLDict was used for 'unindexObject'.
self.tic()
+ def test_03_unindexObjectGrouping(self):
+ person = self.portal.person_module.newContent(portal_type='Person',
+ address_city='Lille',
+ email_text='foo at bar.com')
+ transaction.commit()
+ self.tic()
+ search_catalog = self.portal.portal_catalog.unrestrictedSearchResults
+ uid_list = [person.getUid(),
+ person.default_address.getUid(),
+ person.default_email.getUid()]
+ uid_list.sort()
+ self.assertEqual(len(search_catalog(uid=uid_list)), len(uid_list))
+ self.portal.person_module._delObject(person.getId())
+ del person
+ transaction.commit()
+ self.assertEqual(len(search_catalog(uid=uid_list)), len(uid_list))
+ activity_tool = self.portal.portal_activities
+ self.assertEqual(len(activity_tool.getMessageList()), len(uid_list))
+
+ ActivityTool_invokeGroup = ActivityTool.invokeGroup
+ invokeGroup_list = []
+ def invokeGroup(self, method_id, message_list, activity):
+ invokeGroup_list.extend((method_id,
+ sorted(m.kw.get('uid') for m in message_list),
+ activity))
+ return ActivityTool_invokeGroup(self, method_id, message_list, activity)
+ try:
+ ActivityTool.invokeGroup = invokeGroup
+ self.tic()
+ finally:
+ ActivityTool.invokeGroup = ActivityTool_invokeGroup
+ self.assertEqual(invokeGroup_list,
+ ['portal_catalog/uncatalogObjectList', uid_list, 'SQLQueue'])
+ self.assertEqual(len(search_catalog(uid=uid_list)), 0)
+
+
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestCopySupport))
More information about the Erp5-report
mailing list