[Erp5-report] r37686 jm - in /erp5/trunk/products/CMFActivity: ./ Activity/ skins/activity/...
nobody at svn.erp5.org
nobody at svn.erp5.org
Wed Aug 11 10:35:15 CEST 2010
Author: jm
Date: Wed Aug 11 10:35:15 2010
New Revision: 37686
URL: http://svn.erp5.org?rev=37686&view=rev
Log:
CMFActivity: add message grouping support for processing SQLQueue
Added:
erp5/trunk/products/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql
- copied, changed from r37685, erp5/trunk/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLBase_processMessage.zsql
- copied, changed from r37685, erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql
- copied, changed from r37685, erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql
- copied, changed from r37685, erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql
Removed:
erp5/trunk/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql
Modified:
erp5/trunk/products/CMFActivity/Activity/SQLBase.py
erp5/trunk/products/CMFActivity/Activity/SQLDict.py
erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
erp5/trunk/products/CMFActivity/ActivityTool.py
erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql
erp5/trunk/products/CMFActivity/tests/testCMFActivity.py
Modified: erp5/trunk/products/CMFActivity/Activity/SQLBase.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLBase.py?rev=37686&r1=37685&r2=37686&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLBase.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLBase.py [utf8] Wed Aug 11 10:35:15 2010
@@ -148,12 +148,15 @@ class SQLBase:
This number is guaranted not to be exceeded.
If None (or not given) no limit apply.
"""
- result = not group_method_id and \
- activity_tool.SQLDict_selectReservedMessageList(
- processing_node=processing_node, count=limit)
+ select = activity_tool.SQLBase_selectReservedMessageList
+ result = not group_method_id and select(table=self.sql_table, count=limit,
+ processing_node=processing_node)
if not result:
- activity_tool.SQLDict_reserveMessageList(count=limit, processing_node=processing_node, to_date=date, group_method_id=group_method_id)
- result = activity_tool.SQLDict_selectReservedMessageList(processing_node=processing_node, count=limit)
+ activity_tool.SQLBase_reserveMessageList(table=self.sql_table,
+ count=limit, processing_node=processing_node, to_date=date,
+ group_method_id=group_method_id)
+ result = select(table=self.sql_table,
+ processing_node=processing_node, count=limit)
return result
def makeMessageListAvailable(self, activity_tool, uid_list):
@@ -161,7 +164,8 @@ class SQLBase:
Put messages back in processing_node=0 .
"""
if len(uid_list):
- activity_tool.SQLDict_makeMessageListAvailable(uid_list=uid_list)
+ activity_tool.SQLBase_makeMessageListAvailable(table=self.sql_table,
+ uid=uid_list)
def getProcessableMessageList(self, activity_tool, processing_node):
"""
@@ -222,7 +226,7 @@ class SQLBase:
m = self.loadMessage(line.message, uid=uid, line=line)
message_list.append(m)
group_method_id = line.group_method_id
- activity_tool.SQLDict_processMessage(uid=[uid])
+ activity_tool.SQLBase_processMessage(table=self.sql_table, uid=[uid])
uid_to_duplicate_uid_list_dict.setdefault(uid, []) \
.extend(getDuplicateMessageUidList(line))
if group_method_id not in (None, '', '\0'):
@@ -243,22 +247,26 @@ class SQLBase:
# We read each line once so lines have distinct uids.
# So what remains to be filtered on are path, method_id and
# order_validation_text.
- key = (line.path, line.method_id, line.order_validation_text)
- original_uid = path_and_method_id_dict.get(key)
- if original_uid is not None:
- uid_to_duplicate_uid_list_dict.setdefault(original_uid, []) \
- .append(line.uid)
- continue
- path_and_method_id_dict[key] = line.uid
- uid_to_duplicate_uid_list_dict.setdefault(line.uid, []) \
- .extend(getDuplicateMessageUidList(line))
+ try:
+ key = line.path, line.method_id, line.order_validation_text
+ except AttributeError:
+ pass # message_queue does not have 'order_validation_text'
+ else:
+ original_uid = path_and_method_id_dict.get(key)
+ if original_uid is not None:
+ uid_to_duplicate_uid_list_dict.setdefault(original_uid, []) \
+ .append(line.uid)
+ continue
+ path_and_method_id_dict[key] = line.uid
+ uid_to_duplicate_uid_list_dict.setdefault(line.uid, []) \
+ .extend(getDuplicateMessageUidList(line))
if count < MAX_GROUPED_OBJECTS:
m = self.loadMessage(line.message, uid=line.uid, line=line)
count += len(m.getObjectList(activity_tool))
message_list.append(m)
else:
unreserve_uid_list.append(line.uid)
- activity_tool.SQLDict_processMessage(
+ activity_tool.SQLBase_processMessage(table=self.sql_table,
uid=[m.uid for m in message_list])
# Unreserve extra messages as soon as possible.
self.makeMessageListAvailable(activity_tool=activity_tool,
@@ -298,7 +306,8 @@ class SQLBase:
group_method_id = group_method_id.split('\0')[0]
if group_method_id not in (None, ""):
method = activity_tool.invokeGroup
- args = (group_method_id, message_list)
+ args = (group_method_id, message_list, self.__class__.__name__,
+ self.merge_duplicate)
activity_runtime_environment = ActivityRuntimeEnvironment(None)
else:
method = activity_tool.invoke
Modified: erp5/trunk/products/CMFActivity/Activity/SQLDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLDict.py?rev=37686&r1=37685&r2=37686&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLDict.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLDict.py [utf8] Wed Aug 11 10:35:15 2010
@@ -52,6 +52,7 @@ class SQLDict(RAMDict, SQLBase):
because use of OOBTree.
"""
sql_table = 'message'
+ merge_duplicate = True
# Transaction commit methods
def prepareQueueMessageList(self, activity_tool, message_list):
Modified: erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLQueue.py?rev=37686&r1=37685&r2=37686&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLQueue.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLQueue.py [utf8] Wed Aug 11 10:35:15 2010
@@ -29,15 +29,9 @@
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
from RAMQueue import RAMQueue
from Queue import VALID, INVALID_PATH
-from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from Products.CMFActivity.Errors import ActivityFlushError
from ZODB.POSException import ConflictError
-from types import ClassType
-import sys
-from time import time
from SQLBase import SQLBase, sort_message_key
-from Products.CMFActivity.ActivityRuntimeEnvironment import (
- ActivityRuntimeEnvironment, getTransactionalVariable)
from zExceptions import ExceptionFormatter
import transaction
@@ -48,17 +42,6 @@ from zLOG import LOG, WARNING, ERROR, IN
MAX_VALIDATED_LIMIT = 1000
# Read this many messages to validate.
READ_MESSAGE_LIMIT = 1000
-# Process this many messages in each dequeueMessage call.
-# Downside of setting to a "small" value: the cost of reserving a batch of
-# few messages increases relatively to the cost of executing activities,
-# making CMFActivity overhead significant.
-# Downside of setting to a "big" value: if there are many slow activities in
-# a multi-activity-node environment, multiple slow activities will be reserved
-# by a single node, making a suboptimal use of the parallelisation offered by
-# the cluster.
-# Before increasing this value, consider using SQLDict with group methods
-# first.
-MESSAGE_BUNDLE_SIZE = 1
MAX_MESSAGE_LIST_SIZE = 100
@@ -69,6 +52,7 @@ class SQLQueue(RAMQueue, SQLBase):
because use of OOBTree.
"""
sql_table = 'message_queue'
+ merge_duplicate = False
def prepareQueueMessageList(self, activity_tool, message_list):
message_list = [m for m in message_list if m.is_registered]
@@ -83,6 +67,8 @@ class SQLQueue(RAMQueue, SQLBase):
method_id_list = [m.method_id for m in registered_message_list]
priority_list = [m.activity_kw.get('priority', 1) for m in registered_message_list]
date_list = [m.activity_kw.get('at_date', None) for m in registered_message_list]
+ group_method_id_list = ['\0'.join([message.activity_kw.get('group_method_id', ''), message.activity_kw.get('group_id', '')])
+ for message in registered_message_list]
tag_list = [m.activity_kw.get('tag', '') for m in registered_message_list]
serialization_tag_list = [m.activity_kw.get('serialization_tag', '') for m in registered_message_list]
dumped_message_list = [self.dumpMessage(m) for m in registered_message_list]
@@ -92,6 +78,7 @@ class SQLQueue(RAMQueue, SQLBase):
method_id_list=method_id_list,
priority_list=priority_list,
message_list=dumped_message_list,
+ group_method_id_list = group_method_id_list,
date_list=date_list,
tag_list=tag_list,
processing_node_list=None,
@@ -110,162 +97,14 @@ class SQLQueue(RAMQueue, SQLBase):
# Nothing to do in SQLQueue.
pass
- def getReservedMessageList(self, activity_tool, date, processing_node, limit=None):
+ def getDuplicateMessageUidList(self, activity_tool, line, processing_node):
"""
- Get and reserve a list of messages.
- limit
- Maximum number of messages to fetch.
- This number is not garanted to be reached, because of:
- - not enough messages being pending execution
- - race condition (other nodes reserving the same messages at the same
- time)
- This number is guaranted not to be exceeded.
- If None (or not given) no limit apply.
+ Reserve unreserved messages matching given line.
+ Return their uids.
"""
- result = activity_tool.SQLQueue_selectReservedMessageList(processing_node=processing_node, count=limit)
- if len(result) == 0:
- activity_tool.SQLQueue_reserveMessageList(count=limit, processing_node=processing_node, to_date=date)
- result = activity_tool.SQLQueue_selectReservedMessageList(processing_node=processing_node, count=limit)
- return result
-
- def makeMessageListAvailable(self, activity_tool, uid_list):
- """
- Put messages back in processing_node=0 .
- """
- if len(uid_list):
- activity_tool.SQLQueue_makeMessageListAvailable(uid_list=uid_list)
-
- def getProcessableMessageList(self, activity_tool, processing_node):
- """
- Always true:
- For each reserved message, delete redundant messages when it gets
- reserved (definitely lost, but they are expandable since redundant).
-
- - reserve a message
- - set reserved message to processing=1 state
- - if this message has a group_method_id:
- - reserve a bunch of BUNDLE_MESSAGE_COUNT messages
- - untill number of impacted objects goes over MAX_GROUPED_OBJECTS
- - get one message from the reserved bunch (this messages will be
- "needed")
- - increase the number of impacted object
- - set "needed" reserved messages to processing=1 state
- - unreserve "unneeded" messages
- - return still-reserved message list
-
- If any error happens in above described process, try to unreserve all
- messages already reserved in that process.
- If it fails, complain loudly that some messages might still be in an
- unclean state.
-
- Returned values:
- list of messages
- """
- def getReservedMessageList(limit):
- line_list = self.getReservedMessageList(activity_tool=activity_tool,
- date=now_date,
- processing_node=processing_node,
- limit=limit)
- if len(line_list):
- LOG('SQLQueue', TRACE, 'Reserved messages: %r' % ([x.uid for x in line_list]))
- return line_list
- def makeMessageListAvailable(uid_list):
- self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
- now_date = self.getNow(activity_tool)
- message_list = []
- try:
- result = getReservedMessageList(limit=MESSAGE_BUNDLE_SIZE)
- for line in result:
- m = self.loadMessage(line.message, uid=line.uid, line=line)
- message_list.append(m)
- if len(message_list):
- activity_tool.SQLQueue_processMessage(uid=[m.uid for x in message_list])
- return message_list
- except:
- LOG('SQLQueue', WARNING, 'Exception while reserving messages.', error=sys.exc_info())
- if len(message_list):
- to_free_uid_list = [m.uid for m in message_list]
- try:
- makeMessageListAvailable(to_free_uid_list)
- except:
- LOG('SQLQueue', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
- else:
- if len(to_free_uid_list):
- LOG('SQLQueue', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
- else:
- LOG('SQLQueue', TRACE, '(no message was reserved)')
- return []
-
- def dequeueMessage(self, activity_tool, processing_node):
- def makeMessageListAvailable(uid_list):
- self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
- message_list = \
- self.getProcessableMessageList(activity_tool, processing_node)
- if message_list:
- processing_stop_time = time() + 30 # Stop processing after more than 10 seconds were spent
- processed_count = 0
- # Commit right before executing messages.
- # As MySQL transaction does not start exactly at the same time as ZODB
- # transactions but a bit later, messages available might be called
- # on objects which are not available - or available in an old
- # version - to ZODB connector.
- # So all connectors must be committed now that we have selected
- # everything needed from MySQL to get a fresh view of ZODB objects.
- transaction.commit()
- tv = getTransactionalVariable(None)
- for m in message_list:
- tv['activity_runtime_environment'] = ActivityRuntimeEnvironment(m)
- processed_count += 1
- # Try to invoke
- try:
- activity_tool.invoke(m)
- if m.getExecutionState() != MESSAGE_NOT_EXECUTED:
- # Commit so that if a message raises it doesn't causes previous
- # successfull messages to be rolled back. This commit might fail,
- # so it is protected the same way as activity execution by the
- # same "try" block.
- transaction.commit()
- else:
- # This message failed, abort.
- transaction.abort()
- except:
- value = m.uid, m.object_path, m.method_id
- LOG('SQLQueue', WARNING, 'Exception raised when invoking message (uid, path, method_id) %r' % (value, ), error=sys.exc_info())
- try:
- transaction.abort()
- except:
- # Unfortunately, database adapters may raise an exception against abort.
- LOG('SQLQueue', PANIC, 'abort failed, thus some objects may be modified accidentally')
- raise
- # We must make sure that the message is not set as executed.
- # It is possible that the message is executed but the commit
- # of the transaction fails
- m.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
- # XXX Is it still useful to free message now that this node is able
- # to reselect it ?
- try:
- makeMessageListAvailable([m.uid])
- except:
- LOG('SQLQueue', ERROR, 'Failed to free message: %r' % (value, ), error=sys.exc_info())
- else:
- LOG('SQLQueue', TRACE, 'Freed message %r' % (value, ))
- if time() > processing_stop_time:
- LOG('SQLQueue', TRACE, 'Stop processing message batch because processing delay exceeded')
- break
- # Release all unprocessed messages
- to_free_uid_list = [m.uid for m in message_list[processed_count:]]
- if to_free_uid_list:
- try:
- makeMessageListAvailable(to_free_uid_list)
- except:
- LOG('SQLQueue', ERROR, 'Failed to free remaining messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
- else:
- LOG('SQLQueue', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
- self.finalizeMessageExecution(activity_tool,
- message_list[:processed_count])
- transaction.commit()
- return not message_list
+ return ()
+ dequeueMessage = SQLBase.dequeueMessage
def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
hasMessage = getattr(activity_tool, 'SQLQueue_hasMessage', None)
Modified: erp5/trunk/products/CMFActivity/ActivityTool.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/ActivityTool.py?rev=37686&r1=37685&r2=37686&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/ActivityTool.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/ActivityTool.py [utf8] Wed Aug 11 10:35:15 2010
@@ -1172,7 +1172,7 @@ class ActivityTool (Folder, UniqueObject
for held in my_self.REQUEST._held:
self.REQUEST._hold(held)
- def invokeGroup(self, method_id, message_list):
+ def invokeGroup(self, method_id, message_list, activity, merge_duplicate):
if self.activity_tracking:
activity_tracking_logger.info(
'invoking group messages: method_id=%s, paths=%s'
@@ -1202,20 +1202,22 @@ class ActivityTool (Folder, UniqueObject
else:
subobject_list = (obj,)
for subobj in subobject_list:
- path = subobj.getPath()
- if path not in path_set:
+ if merge_duplicate:
+ path = subobj.getPath()
+ if path in path_set:
+ continue
path_set.add(path)
- if alternate_method_id is not None \
- and hasattr(aq_base(subobj), alternate_method_id):
- # if this object is alternated,
- # generate a new single active object
- activity_kw = m.activity_kw.copy()
- activity_kw.pop('group_method_id', None)
- activity_kw.pop('group_id', None)
- active_obj = subobj.activate(**activity_kw)
- getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
- else:
- expanded_object_list.append((subobj, m.args, m.kw))
+ if alternate_method_id is not None \
+ and hasattr(aq_base(subobj), alternate_method_id):
+ # if this object is alternated,
+ # generate a new single active object
+ activity_kw = m.activity_kw.copy()
+ activity_kw.pop('group_method_id', None)
+ activity_kw.pop('group_id', None)
+ active_obj = subobj.activate(activity=activity, **activity_kw)
+ getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
+ else:
+ expanded_object_list.append((subobj, m.args, m.kw))
new_message_list.append((m, obj))
except:
m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
Copied: erp5/trunk/products/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql (from r37685, erp5/trunk/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql)
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql?p2=erp5/trunk/products/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql&p1=erp5/trunk/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql&r1=37685&r2=37686&rev=37686&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql [utf8] Wed Aug 11 10:35:15 2010
@@ -7,15 +7,14 @@ cache_time:0
class_name:
class_file:
</dtml-comment>
-<params>uid_list</params>
+<params>table
+uid</params>
UPDATE
- message
+ <dtml-var table>
SET
processing_node=0,
processing=0
WHERE
- uid IN (
- <dtml-in prefix="uid" expr="uid_list"><dtml-sqlvar uid_item type="int"><dtml-if sequence-end><dtml-else>, </dtml-if></dtml-in>
- )
+ <dtml-sqltest uid type="int" multiple>
<dtml-var sql_delimiter>
COMMIT
Copied: erp5/trunk/products/CMFActivity/skins/activity/SQLBase_processMessage.zsql (from r37685, erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql)
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLBase_processMessage.zsql?p2=erp5/trunk/products/CMFActivity/skins/activity/SQLBase_processMessage.zsql&p1=erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql&r1=37685&r2=37686&rev=37686&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLBase_processMessage.zsql [utf8] Wed Aug 11 10:35:15 2010
@@ -7,14 +7,14 @@ cache_time:0
class_name:
class_file:
</dtml-comment>
-<params>uid</params>
-UPDATE message
+<params>table
+uid</params>
+UPDATE
+ <dtml-var table>
SET
processing_date = UTC_TIMESTAMP(),
processing = 1
WHERE
- uid IN (
-<dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
- )
+ <dtml-sqltest uid type="int" multiple>
<dtml-var sql_delimiter>
COMMIT
Copied: erp5/trunk/products/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql (from r37685, erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql)
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql?p2=erp5/trunk/products/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql&p1=erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql&r1=37685&r2=37686&rev=37686&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql [utf8] Wed Aug 11 10:35:15 2010
@@ -7,19 +7,22 @@ cache_time:0
class_name:
class_file:
</dtml-comment>
-<params>processing_node
+<params>table
+processing_node
to_date
count
group_method_id
</params>
UPDATE
- message
+ <dtml-var table>
SET
processing_node=<dtml-sqlvar processing_node type="int">
WHERE
processing_node=0
AND date <= <dtml-sqlvar to_date type="datetime">
- <dtml-if expr="group_method_id is not None"> AND group_method_id = <dtml-sqlvar group_method_id type="string"> </dtml-if>
+ <dtml-if expr="group_method_id is not None">
+ AND group_method_id = <dtml-sqlvar group_method_id type="string">
+ </dtml-if>
ORDER BY
<dtml-comment>
Explanation of the order by:
Copied: erp5/trunk/products/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql (from r37685, erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql)
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql?p2=erp5/trunk/products/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql&p1=erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql&r1=37685&r2=37686&rev=37686&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql [utf8] Wed Aug 11 10:35:15 2010
@@ -7,12 +7,13 @@ cache_time:0
class_name:
class_file:
</dtml-comment>
-<params>processing_node
+<params>table
+processing_node
count</params>
SELECT
*
FROM
- message_queue
+ <dtml-var table>
WHERE
processing_node = <dtml-sqlvar processing_node type="int">
<dtml-if expr="count is not None">
Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql?rev=37685&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql (removed)
@@ -1,21 +0,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:0
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>uid_list</params>
-UPDATE
- message
-SET
- processing_node=0,
- processing=0
-WHERE
- uid IN (
- <dtml-in prefix="uid" expr="uid_list"><dtml-sqlvar uid_item type="int"><dtml-if sequence-end><dtml-else>, </dtml-if></dtml-in>
- )
-<dtml-var sql_delimiter>
-COMMIT
Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql?rev=37685&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql (removed)
@@ -1,20 +0,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:1
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>uid</params>
-UPDATE message
-SET
- processing_date = UTC_TIMESTAMP(),
- processing = 1
-WHERE
- uid IN (
-<dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
- )
-<dtml-var sql_delimiter>
-COMMIT
Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql?rev=37685&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql (removed)
@@ -1,36 +0,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:0
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>processing_node
-to_date
-count
-group_method_id
-</params>
-UPDATE
- message
-SET
- processing_node=<dtml-sqlvar processing_node type="int">
-WHERE
- processing_node=0
- AND date <= <dtml-sqlvar to_date type="datetime">
- <dtml-if expr="group_method_id is not None"> AND group_method_id = <dtml-sqlvar group_method_id type="string"> </dtml-if>
-ORDER BY
-<dtml-comment>
- Explanation of the order by:
- - priority must be respected (it is a feature)
- - when multiple nodes simultaneously try to fetch activities, they should not
- be given the same set of lines as it would cause all minus one to wait for
- a write lock (and be ultimately aborted), effectively serializing their
- action (so breaking paralellism).
- So we must force MySQL to update lines in a random order.
-</dtml-comment>
- priority, RAND()
-LIMIT <dtml-sqlvar count type="int">
-<dtml-var sql_delimiter>
-COMMIT
Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql?rev=37685&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql (removed)
@@ -1,20 +0,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:0
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>processing_node
-count</params>
-SELECT
- *
-FROM
- message
-WHERE
- processing_node = <dtml-sqlvar processing_node type="int">
-<dtml-if expr="count is not None">
- LIMIT <dtml-sqlvar count type="int">
-</dtml-if>
Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql?rev=37686&r1=37685&r2=37686&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql [utf8] Wed Aug 11 10:35:15 2010
@@ -18,6 +18,7 @@ CREATE TABLE `message_queue` (
`processing` TINYINT NOT NULL DEFAULT 0,
`processing_date` DATETIME,
`priority` TINYINT NOT NULL DEFAULT 0,
+ `group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
`tag` VARCHAR(255) NOT NULL,
`serialization_tag` VARCHAR(255) NOT NULL,
`retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql?rev=37685&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql (removed)
@@ -1,21 +0,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:0
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>uid_list</params>
-UPDATE
- message_queue
-SET
- processing_node=0,
- processing=0
-WHERE
- uid IN (
- <dtml-in prefix="uid" expr="uid_list"><dtml-sqlvar uid_item type="int"><dtml-if sequence-end><dtml-else>, </dtml-if></dtml-in>
- )
-<dtml-var sql_delimiter>
-COMMIT
Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql?rev=37685&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql (removed)
@@ -1,34 +0,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:0
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>processing_node
-to_date
-count
-</params>
-UPDATE
- message_queue
-SET
- processing_node=<dtml-sqlvar processing_node type="int">
-WHERE
- processing_node=0
- AND date <= <dtml-sqlvar to_date type="datetime">
-ORDER BY
-<dtml-comment>
- Explanation of the order by:
- - priority must be respected (it is a feature)
- - when multiple nodes simultaneously try to fetch activities, they should not
- be given the same set of lines as it would cause all minus one to wait for
- a write lock (and be ultimately aborted), effectively serializing their
- action (so breaking paralellism).
- So we must force MySQL to update lines in a random order.
-</dtml-comment>
- priority, RAND()
-LIMIT <dtml-sqlvar count type="int">
-<dtml-var sql_delimiter>
-COMMIT
Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql?rev=37685&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql (removed)
@@ -1,20 +0,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:0
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>processing_node
-count</params>
-SELECT
- *
-FROM
- message_queue
-WHERE
- processing_node = <dtml-sqlvar processing_node type="int">
-<dtml-if expr="count is not None">
- LIMIT <dtml-sqlvar count type="int">
-</dtml-if>
Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql?rev=37686&r1=37685&r2=37686&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql [utf8] Wed Aug 11 10:35:15 2010
@@ -15,11 +15,12 @@ message_list
priority_list
processing_node_list
date_list
+group_method_id_list
tag_list
serialization_tag_list
</params>
INSERT INTO message_queue
-(uid, path, active_process_uid, date, method_id, processing_node, processing, priority, tag, serialization_tag, message)
+(uid, path, active_process_uid, date, method_id, processing_node, processing, priority, group_method_id, tag, serialization_tag, message)
VALUES
<dtml-in prefix="loop" expr="_.range(_.len(path_list))">
<dtml-if sequence-start><dtml-else>,</dtml-if>
@@ -32,6 +33,7 @@ VALUES
<dtml-if expr="processing_node_list is not None"><dtml-sqlvar expr="processing_node_list[loop_item]" type="int"><dtml-else>-1</dtml-if>,
0,
<dtml-sqlvar expr="priority_list[loop_item]" type="int">,
+ <dtml-sqlvar expr="group_method_id_list[loop_item]" type="string">,
<dtml-sqlvar expr="tag_list[loop_item]" type="string">,
<dtml-sqlvar expr="serialization_tag_list[loop_item]" type="string">,
<dtml-sqlvar expr="message_list[loop_item]" type="string">
Modified: erp5/trunk/products/CMFActivity/tests/testCMFActivity.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/tests/testCMFActivity.py?rev=37686&r1=37685&r2=37686&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/tests/testCMFActivity.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/tests/testCMFActivity.py [utf8] Wed Aug 11 10:35:15 2010
@@ -430,7 +430,7 @@ class TestCMFActivity(ERP5TypeTestCase,
def TryActiveProcessInsideActivity(self, activity):
"""
Try two levels with active_process, we create one first
- activity with an acitive process, then this new activity
+ activity with an active process, then this new activity
uses another active process
"""
portal = self.getPortal()
@@ -1872,27 +1872,26 @@ class TestCMFActivity(ERP5TypeTestCase,
getattr(organisation, 'uid')
- def test_80_CallWithGroupIdParamater(self, quiet=0, run=run_all_test):
- """
- Test that group_id parameter is used to separate execution of the same method
- """
+ def callWithGroupIdParamater(self, activity, quiet, run):
if not run: return
if not quiet:
- message = '\nTest Activity with group_id parameter'
+ message = '\nTest Activity with group_id parameter (%s)' % activity
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
portal = self.getPortal()
organisation = portal.organisation._getOb(self.company_id)
# Defined a group method
+ foobar_list = []
def setFoobar(self, object_list):
+ foobar_list.append(len(object_list))
for obj, args, kw in object_list:
number = kw.get('number', 1)
if getattr(obj,'foobar', None) is not None:
obj.foobar = obj.foobar + number
else:
obj.foobar = number
- object_list[:] = []
+ del object_list[:]
from Products.ERP5Type.Document.Folder import Folder
Folder.setFoobar = setFoobar
@@ -1905,46 +1904,65 @@ class TestCMFActivity(ERP5TypeTestCase,
# Test group_method_id is working without group_id
for x in xrange(5):
- organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar").reindexObject(number=1)
+ organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar").reindexObject(number=1)
transaction.commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),5)
portal.portal_activities.distribute()
portal.portal_activities.tic()
- self.assertEquals(1, organisation.getFoobar())
+ expected = dict(SQLDict=1, SQLQueue=5)[activity]
+ self.assertEquals(expected, organisation.getFoobar())
# Test group_method_id is working with one group_id defined
for x in xrange(5):
- organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
+ organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
transaction.commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),5)
portal.portal_activities.distribute()
portal.portal_activities.tic()
- self.assertEquals(2, organisation.getFoobar())
+ self.assertEquals(expected * 2, organisation.getFoobar())
+
+ self.assertEquals([expected, expected], foobar_list)
+ del foobar_list[:]
# Test group_method_id is working with many group_id defined
for x in xrange(5):
- organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
+ organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
transaction.commit()
- organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="2").reindexObject(number=3)
+ organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="2").reindexObject(number=3)
transaction.commit()
- organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
+ organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
transaction.commit()
- organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="3").reindexObject(number=5)
+ organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="3").reindexObject(number=5)
transaction.commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),20)
portal.portal_activities.distribute()
portal.portal_activities.tic()
- self.assertEquals(11, organisation.getFoobar())
+ self.assertEquals(dict(SQLDict=11, SQLQueue=60)[activity],
+ organisation.getFoobar())
+ self.assertEquals(dict(SQLDict=[1, 1, 1], SQLQueue=[5, 5, 10])[activity],
+ sorted(foobar_list))
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list), 0)
-
+
+ def test_80a_CallWithGroupIdParamaterSQLDict(self, quiet=0, run=run_all_test):
+ """
+ Test that group_id parameter is used to separate execution of the same method
+ """
+ self.callWithGroupIdParamater('SQLDict', quiet=quiet, run=run)
+
+ def test_80b_CallWithGroupIdParamaterSQLQueue(self, quiet=0,
+ run=run_all_test):
+ """
+ Test that group_id parameter is used to separate execution of the same method
+ """
+ self.callWithGroupIdParamater('SQLQueue', quiet=quiet, run=run)
def test_81_ActivateKwForWorkflowTransition(self, quiet=0, run=run_all_test):
"""
@@ -3148,7 +3166,7 @@ class TestCMFActivity(ERP5TypeTestCase,
self.assertEqual(len(message_list), 1)
message = message_list[0]
portal.organisation_module._delOb(organisation.id)
- activity_tool.invokeGroup('getTitle', [message])
+ activity_tool.invokeGroup('getTitle', [message], 'SQLDict', True)
checkMessage(message, KeyError)
activity_tool.manageCancel(message.object_path, message.method_id)
# 2: activity method does not exist when activity is executed
@@ -3157,7 +3175,8 @@ class TestCMFActivity(ERP5TypeTestCase,
message_list = activity_tool.getMessageList()
self.assertEqual(len(message_list), 1)
message = message_list[0]
- activity_tool.invokeGroup('this_method_does_not_exist', [message])
+ activity_tool.invokeGroup('this_method_does_not_exist',
+ [message], 'SQLDict', True)
checkMessage(message, KeyError)
activity_tool.manageCancel(message.object_path, message.method_id)
@@ -3739,7 +3758,6 @@ class TestCMFActivity(ERP5TypeTestCase,
LOG('Testing... ',0,message)
self.TryNotificationSavedOnEventLogWhenSiteErrorLoggerRaises('SQLDict')
- @expectedFailure
def test_123_userNotificationSavedOnEventLogWhenSiteErrorLoggerRaisesWithSQLQueue(self, quiet=0, run=run_all_test):
if not run: return
if not quiet:
More information about the Erp5-report
mailing list