[Erp5-report] r32876 jm - in /erp5/trunk/products/CMFActivity: Activity/ skins/activity/ te...
nobody at svn.erp5.org
nobody at svn.erp5.org
Fri Feb 19 18:55:56 CET 2010
Author: jm
Date: Fri Feb 19 18:55:56 2010
New Revision: 32876
URL: http://svn.erp5.org?rev=32876&view=rev
Log:
Refactoring: move finalizeMessageExecution to SQLBase
Added:
erp5/trunk/products/CMFActivity/skins/activity/SQLBase_assignMessage.zsql
- copied, changed from r32875, erp5/trunk/products/CMFActivity/skins/activity/SQLDict_assignMessage.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLBase_delMessage.zsql
- copied, changed from r32875, erp5/trunk/products/CMFActivity/skins/activity/SQLDict_delMessage.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLBase_reactivate.zsql
- copied, changed from r32875, erp5/trunk/products/CMFActivity/skins/activity/SQLDict_setPriority.zsql
Removed:
erp5/trunk/products/CMFActivity/skins/activity/SQLDict_assignMessage.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLDict_delMessage.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLDict_setPriority.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_delMessage.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_setPriority.zsql
Modified:
erp5/trunk/products/CMFActivity/Activity/SQLBase.py
erp5/trunk/products/CMFActivity/Activity/SQLDict.py
erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
erp5/trunk/products/CMFActivity/tests/testCMFActivity.py
Modified: erp5/trunk/products/CMFActivity/Activity/SQLBase.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLBase.py?rev=32876&r1=32875&r2=32876&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLBase.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLBase.py [utf8] Fri Feb 19 18:55:56 2010
@@ -26,8 +26,17 @@
#
##############################################################################
-from zLOG import LOG, INFO, WARNING
+import sys
+from zLOG import LOG, TRACE, INFO, WARNING, ERROR
from ZODB.POSException import ConflictError
+from Products.CMFActivity.ActivityTool import (
+ MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED)
+from Products.CMFActivity.ActiveObject import (
+ INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE)
+from Queue import VALIDATION_ERROR_DELAY
+
+MAX_PRIORITY = 5
+
class SQLBase:
"""
@@ -74,19 +83,14 @@
priority = default
return priority
- def _retryOnLockError(self, method, args=(), kw=None):
- if kw is None:
- kw = {}
+ def _retryOnLockError(self, method, args=(), kw={}):
while True:
try:
- result = method(*args, **kw)
+ return method(*args, **kw)
except ConflictError:
# Note that this code assumes that a database adapter translates
# a lock error into a conflict error.
LOG('SQLBase', INFO, 'Got a lock error, retrying...')
- else:
- break
- return result
def _validate_after_method_id(self, activity_tool, message, value):
return self._validate(activity_tool, method_id=value)
@@ -117,3 +121,115 @@
def _validate_serialization_tag(self, activity_tool, message, value):
return self._validate(activity_tool, serialization_tag=value)
+
+ def _log(self, severity, summary):
+ LOG(self.__class__.__name__, severity, summary,
+ error=severity>INFO and sys.exc_info() or None)
+
+ def finalizeMessageExecution(self, activity_tool, message_list,
+ uid_to_duplicate_uid_list_dict=None):
+ """
+ If everything was fine, delete all messages.
+ If anything failed, make successful messages available (if any), and
+ the following rules apply to failed messages:
+ - Failures due to ConflictErrors cause messages to be postponed,
+ but their priority is *not* increased.
+ - Failures of messages already above maximum priority cause them to
+ be put in a permanent-error state.
+ - In all other cases, priority is increased and message is delayed.
+ """
+ deletable_uid_list = []
+ delay_uid_list = []
+ final_error_uid_list = []
+ make_available_uid_list = []
+ notify_user_list = []
+ non_executable_message_list = []
+ executed_uid_list = deletable_uid_list
+ if uid_to_duplicate_uid_list_dict is not None:
+ for m in message_list:
+ if m.getExecutionState() == MESSAGE_NOT_EXECUTED:
+ executed_uid_list = make_available_uid_list
+ break
+ for m in message_list:
+ uid = m.uid
+ if m.getExecutionState() == MESSAGE_EXECUTED:
+ executed_uid_list.append(uid)
+ if uid_to_duplicate_uid_list_dict is not None:
+ executed_uid_list += uid_to_duplicate_uid_list_dict.get(uid, ())
+ elif m.getExecutionState() == MESSAGE_NOT_EXECUTED:
+ # Should duplicate messages follow strictly the original message, or
+ # should they be just made available again ?
+ if uid_to_duplicate_uid_list_dict is not None:
+ make_available_uid_list += uid_to_duplicate_uid_list_dict.get(uid, ())
+ priority = m.line.priority
+ # BACK: Only exceptions can be classes in Python 2.6.
+ # Once we drop support for Python 2.4,
+ # please, remove the "type(m.exc_type) is type(ConflictError)" check
+ # and leave only the "issubclass(m.exc_type, ConflictError)" check.
+ if type(m.exc_type) is type(ConflictError) and \
+ issubclass(m.exc_type, ConflictError):
+ delay_uid_list.append(uid)
+ elif priority > MAX_PRIORITY:
+ notify_user_list.append(m)
+ final_error_uid_list.append(uid)
+ else:
+ try:
+ # Immediately update, because values different for every message
+ activity_tool.SQLBase_reactivate(table=self.sql_table,
+ uid=[uid],
+ delay=None,
+ priority=priority + 1)
+ except:
+ self._log(WARNING, 'Failed to increase priority of %r' % uid)
+ delay_uid_list.append(uid)
+ else:
+ # Internal CMFActivity error: the message can not be executed because
+ # something is missing (context object cannot be found, method cannot
+ # be accessed on object).
+ non_executable_message_list.append(uid)
+ if deletable_uid_list:
+ try:
+ self._retryOnLockError(activity_tool.SQLBase_delMessage,
+ kw={'table': self.sql_table,
+ 'uid': deletable_uid_list})
+ except:
+ self._log(ERROR, 'Failed to delete messages %r' % deletable_uid_list)
+ else:
+ self._log(TRACE, 'Deleted messages %r' % deletable_uid_list)
+ if delay_uid_list:
+ try:
+ # If this is a conflict error, do not lower the priority but only delay.
+ activity_tool.SQLBase_reactivate(table=self.sql_table,
+ uid=delay_uid_list, delay=VALIDATION_ERROR_DELAY, priority=None)
+ except:
+ self._log(ERROR, 'Failed to delay %r' % delay_uid_list)
+ make_available_uid_list += delay_uid_list
+ if final_error_uid_list:
+ try:
+ activity_tool.SQLBase_assignMessage(table=self.sql_table,
+ uid=final_error_uid_list, processing_node=INVOKE_ERROR_STATE)
+ except:
+ self._log(ERROR, 'Failed to set message to error state for %r'
+ % final_error_uid_list)
+ if non_executable_message_list:
+ try:
+ activity_tool.SQLBase_assignMessage(table=self.sql_table,
+ uid=non_executable_message_list, processing_node=VALIDATE_ERROR_STATE)
+ except:
+ self._log(ERROR, 'Failed to set message to invalid path state for %r'
+ % non_executable_message_list)
+ if make_available_uid_list:
+ try:
+ self.makeMessageListAvailable(activity_tool=activity_tool,
+ uid_list=make_available_uid_list)
+ except:
+ self._log(ERROR, 'Failed to unreserve %r' % make_available_uid_list)
+ else:
+ self._log(TRACE, 'Freed messages %r' % make_available_uid_list)
+ try:
+ for m in notify_user_list:
+ m.notifyUser(activity_tool)
+ except:
+ # Notification failures must not cause this method to raise.
+ self._log(WARNING,
+ 'Exception during notification phase of finalizeMessageExecution')
Modified: erp5/trunk/products/CMFActivity/Activity/SQLDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLDict.py?rev=32876&r1=32875&r2=32876&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLDict.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLDict.py [utf8] Fri Feb 19 18:55:56 2010
@@ -27,8 +27,7 @@
##############################################################################
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
-from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY, \
- abortTransactionSynchronously
+from Queue import VALID, INVALID_PATH, abortTransactionSynchronously
from RAMDict import RAMDict
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from Products.CMFActivity.Errors import ActivityFlushError
@@ -47,7 +46,6 @@
from zLOG import LOG, TRACE, WARNING, ERROR, INFO, PANIC
-MAX_PRIORITY = 5
# Stop validating more messages when this limit is reached
MAX_VALIDATED_LIMIT = 1000
# Read up to this number of messages to validate.
@@ -64,6 +62,8 @@
and provide sequentiality. Should not create conflict
because use of OOBTree.
"""
+ sql_table = 'message'
+
# Transaction commit methods
def prepareQueueMessageList(self, activity_tool, message_list):
message_list = [m for m in message_list if m.is_registered]
@@ -104,7 +104,7 @@
order_validation_text = order_validation_text)
uid_list = [x.uid for x in uid_list]
if len(uid_list)>0:
- activity_tool.SQLDict_delMessage(uid = uid_list)
+ activity_tool.SQLBase_delMessage(table=self.sql_table, uid=uid_list)
def finishQueueMessage(self, activity_tool_path, m):
# Nothing to do in SQLDict.
@@ -130,26 +130,6 @@
def getRegisteredMessageList(self, activity_buffer, activity_tool):
message_list = activity_buffer.getMessageList(self)
return [m for m in message_list if m.is_registered]
-
- def validateMessage(self, activity_tool, message, uid_list, priority, processing_node):
- validation_state = message.validate(self, activity_tool, check_order_validation=0)
- if validation_state is not VALID:
- # There is a serious validation error - we must lower priority
- if priority > MAX_PRIORITY:
- # This is an error
- if len(uid_list) > 0:
- activity_tool.SQLDict_assignMessage(uid=uid_list, processing_node=VALIDATE_ERROR_STATE)
- # Assign message back to 'error' state
- #m.notifyUser(activity_tool) # Notify Error
- get_transaction().commit() # and commit
- else:
- # Lower priority
- if len(uid_list) > 0: # Add some delay before new processing
- activity_tool.SQLDict_setPriority(uid=uid_list, delay=VALIDATION_ERROR_DELAY,
- priority=priority + 1, retry=1)
- get_transaction().commit() # Release locks before starting a potentially long calculation
- return 0
- return 1
def getReservedMessageList(self, activity_tool, date, processing_node, limit=None, group_method_id=None):
"""
@@ -320,105 +300,6 @@
else:
LOG('SQLDict', TRACE, '(no message was reserved)')
return [], 0, None, {}
-
- def finalizeMessageExecution(self, activity_tool, message_list, uid_to_duplicate_uid_list_dict):
- """
- If everything was fine, delete all messages.
- If anything failed, make successful messages available (if any), and
- the following rules apply to failed messages:
- - Failures due to ConflictErrors cause messages to be postponed,
- but their priority is *not* increased.
- - Failures of messages already above maximum priority cause them to
- be put in a permanent-error state.
- - In all other cases, priority is increased and message is delayed.
- """
- def makeMessageListAvailable(uid_list):
- self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
- deletable_uid_list = []
- delay_uid_list = []
- final_error_uid_list = []
- make_available_uid_list = []
- notify_user_list = []
- non_executable_message_list = []
- something_failed = (len([m for m in message_list if m.getExecutionState() == MESSAGE_NOT_EXECUTED]) != 0)
- for m in message_list:
- uid = m.uid
- if m.getExecutionState() == MESSAGE_EXECUTED:
- if something_failed:
- make_available_uid_list.append(uid)
- make_available_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
- else:
- deletable_uid_list.append(uid)
- deletable_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
- elif m.getExecutionState() == MESSAGE_NOT_EXECUTED:
- # Should duplicate messages follow strictly the original message, or
- # should they be just made available again ?
- make_available_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
- priority = m.line.priority
- # BACK: Only exceptions can be classes in Python 2.6.
- # Once we drop support for Python 2.4,
- # please, remove the "type(m.exc_type) is type(ConflictError)" check
- # and leave only the "issubclass(m.exc_type, ConflictError)" check.
- if type(m.exc_type) is type(ConflictError) and \
- issubclass(m.exc_type, ConflictError):
- delay_uid_list.append(uid)
- elif priority > MAX_PRIORITY:
- notify_user_list.append(m)
- final_error_uid_list.append(uid)
- else:
- try:
- # Immediately update, because values different for every message
- activity_tool.SQLDict_setPriority(
- uid=[uid],
- delay=None,
- retry=None,
- priority=priority + 1)
- except:
- LOG('SQLDict', WARNING, 'Failed to increase priority of %r' % (uid, ), error=sys.exc_info())
- delay_uid_list.append(uid)
- else:
- # Internal CMFActivity error: the message can not be executed because
- # something is missing (context object cannot be found, method cannot
- # be accessed on object).
- non_executable_message_list.append(uid)
- if len(deletable_uid_list):
- try:
- self._retryOnLockError(activity_tool.SQLDict_delMessage, kw={'uid': deletable_uid_list})
- except:
- LOG('SQLDict', ERROR, 'Failed to delete messages %r' % (deletable_uid_list, ), error=sys.exc_info())
- else:
- LOG('SQLDict', TRACE, 'Deleted messages %r' % (deletable_uid_list, ))
- if len(delay_uid_list):
- try:
- # If this is a conflict error, do not lower the priority but only delay.
- activity_tool.SQLDict_setPriority(uid=delay_uid_list, delay=VALIDATION_ERROR_DELAY, priority=None, retry=None)
- except:
- LOG('SQLDict', ERROR, 'Failed to delay %r' % (delay_uid_list, ), error=sys.exc_info())
- make_available_uid_list += delay_uid_list
- if len(final_error_uid_list):
- try:
- activity_tool.SQLDict_assignMessage(uid=final_error_uid_list,
- processing_node=INVOKE_ERROR_STATE)
- except:
- LOG('SQLDict', ERROR, 'Failed to set message to error state for %r' % (final_error_uid_list, ), error=sys.exc_info())
- if len(non_executable_message_list):
- try:
- activity_tool.SQLDict_assignMessage(uid=non_executable_message_list, processing_node=VALIDATE_ERROR_STATE)
- except:
- LOG('SQLDict', ERROR, 'Failed to set message to invalid path state for %r' % (non_executable_message_list, ), error=sys.exc_info())
- if len(make_available_uid_list):
- try:
- makeMessageListAvailable(make_available_uid_list)
- except:
- LOG('SQLDict', ERROR, 'Failed to unreserve %r' % (make_available_uid_list, ), error=sys.exc_info())
- else:
- LOG('SQLDict', TRACE, 'Freed messages %r' % (make_available_uid_list, ))
- try:
- for m in notify_user_list:
- m.notifyUser(activity_tool)
- except:
- # Notification failures must not cause this method to raise.
- LOG('SQLDict', WARNING, 'Exception during notification phase of finalizeMessageExecution', error=sys.exc_info())
# Queue semantic
def dequeueMessage(self, activity_tool, processing_node):
@@ -595,7 +476,8 @@
uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
order_validation_text=None)
if len(uid_list)>0:
- activity_tool.SQLDict_delMessage(uid = [x.uid for x in uid_list])
+ activity_tool.SQLBase_delMessage(table=self.sql_table,
+ uid=[x.uid for x in uid_list])
getMessageList = SQLBase.getMessageList
@@ -672,11 +554,13 @@
if group_method_id is not None:
serialization_tag_group_method_id_dict[serialization_tag] = group_method_id
if deletable_uid_list:
- activity_tool.SQLDict_delMessage(uid=deletable_uid_list)
+ activity_tool.SQLBase_delMessage(table=self.sql_table,
+ uid=deletable_uid_list)
distributable_count = len(message_dict)
if distributable_count:
- activity_tool.SQLDict_assignMessage(processing_node=0, uid=[message.uid for message in message_dict.itervalues()])
+ activity_tool.SQLBase_assignMessage(table=self.sql_table,
+ processing_node=0, uid=[m.uid for m in message_dict.itervalues()])
validated_count += distributable_count
if validated_count < MAX_VALIDATED_LIMIT:
offset += READ_MESSAGE_LIMIT
Modified: erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLQueue.py?rev=32876&r1=32875&r2=32876&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLQueue.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLQueue.py [utf8] Fri Feb 19 18:55:56 2010
@@ -28,8 +28,7 @@
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
from RAMQueue import RAMQueue
-from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY, \
- abortTransactionSynchronously
+from Queue import VALID, INVALID_PATH, abortTransactionSynchronously
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from Products.CMFActivity.Errors import ActivityFlushError
from ZODB.POSException import ConflictError
@@ -48,7 +47,6 @@
from zLOG import LOG, WARNING, ERROR, INFO, PANIC, TRACE
-MAX_PRIORITY = 5
# Stop validating more messages when this limit is reached
MAX_VALIDATED_LIMIT = 1000
# Read this many messages to validate.
@@ -73,6 +71,7 @@
and provide sequentiality. Should not create conflict
because use of OOBTree.
"""
+ sql_table = 'message_queue'
def prepareQueueMessageList(self, activity_tool, message_list):
message_list = [m for m in message_list if m.is_registered]
@@ -103,7 +102,7 @@
def prepareDeleteMessage(self, activity_tool, m):
# Erase all messages in a single transaction
#LOG("prepareDeleteMessage", 0, str(m.__dict__))
- activity_tool.SQLQueue_delMessage(uid = [m.uid])
+ activity_tool.SQLBase_delMessage(table=self.sql_table, uid=[m.uid])
def finishQueueMessage(self, activity_tool_path, m):
# Nothing to do in SQLQueue.
@@ -198,88 +197,6 @@
else:
LOG('SQLQueue', TRACE, '(no message was reserved)')
return []
-
- def finalizeMessageExecution(self, activity_tool, message_list):
- def makeMessageListAvailable(uid_list):
- self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
- deletable_uid_list = []
- delay_uid_list = []
- final_error_uid_list = []
- notify_user_list = []
- non_executable_message_list = []
- for m in message_list:
- uid = m.uid
- if m.getExecutionState() == MESSAGE_EXECUTED:
- deletable_uid_list.append(uid)
- elif m.getExecutionState() == MESSAGE_NOT_EXECUTED:
- priority = m.line.priority
- # BACK: Only exceptions can be classes in Python 2.6.
- # Once we drop support for Python 2.4,
- # please, remove the "type(m.exc_type) is type(ConflictError)" check
- # and leave only the "issubclass(m.exc_type, ConflictError)" check.
- if type(m.exc_type) is type(ConflictError) and \
- issubclass(m.exc_type, ConflictError):
- delay_uid_list.append(uid)
- elif priority > MAX_PRIORITY:
- notify_user_list.append(m)
- final_error_uid_list.append(uid)
- else:
- try:
- # Immediately update, because values different for every message
- activity_tool.SQLQueue_setPriority(
- uid=[uid],
- delay=VALIDATION_ERROR_DELAY,
- priority=priority + 1)
- except:
- LOG('SQLQueue', WARNING, 'Failed to increase priority of %r' % (uid, ), error=sys.exc_info())
- try:
- makeMessageListAvailable(delay_uid_list)
- except:
- LOG('SQLQueue', ERROR, 'Failed to unreserve %r' % (uid, ), error=sys.exc_info())
- else:
- LOG('SQLQueue', TRACE, 'Freed message %r' % (uid, ))
- else:
- # Internal CMFActivity error: the message can not be executed because
- # something is missing (context object cannot be found, method cannot
- # be accessed on object).
- non_executable_message_list.append(uid)
- if len(deletable_uid_list):
- try:
- self._retryOnLockError(activity_tool.SQLQueue_delMessage, kw={'uid': deletable_uid_list})
- except:
- LOG('SQLQueue', ERROR, 'Failed to delete messages %r' % (deletable_uid_list, ), error=sys.exc_info())
- else:
- LOG('SQLQueue', TRACE, 'Deleted messages %r' % (deletable_uid_list, ))
- if len(delay_uid_list):
- try:
- # If this is a conflict error, do not lower the priority but only delay.
- activity_tool.SQLQueue_setPriority(uid=delay_uid_list, delay=VALIDATION_ERROR_DELAY, priority=None)
- except:
- LOG('SQLQueue', ERROR, 'Failed to delay %r' % (delay_uid_list, ), error=sys.exc_info())
- try:
- makeMessageListAvailable(delay_uid_list)
- except:
- LOG('SQLQueue', ERROR, 'Failed to unreserve %r' % (delay_uid_list, ), error=sys.exc_info())
- else:
- LOG('SQLQueue', TRACE, 'Freed messages %r' % (delay_uid_list, ))
- if len(final_error_uid_list):
- try:
- activity_tool.SQLQueue_assignMessage(uid=final_error_uid_list,
- processing_node=INVOKE_ERROR_STATE)
- except:
- LOG('SQLQueue', ERROR, 'Failed to set message to error state for %r' % (final_error_uid_list, ), error=sys.exc_info())
- if len(non_executable_message_list):
- try:
- activity_tool.SQLQueue_assignMessage(uid=non_executable_message_list,
- processing_node=VALIDATE_ERROR_STATE)
- except:
- LOG('SQLQueue', ERROR, 'Failed to set message to invalid path state for %r' % (final_error_uid_list, ), error=sys.exc_info())
- try:
- for m in notify_user_list:
- m.notifyUser(activity_tool)
- except:
- # Notification failures must not cause this method to raise.
- LOG('SQLQueue', WARNING, 'Exception during notification phase of finalizeMessageExecution', error=sys.exc_info())
def dequeueMessage(self, activity_tool, processing_node):
def makeMessageListAvailable(uid_list):
@@ -430,7 +347,8 @@
'Could not validate %s on %s' % (m.method_id , path))
if len(result):
- activity_tool.SQLQueue_delMessage(uid = [line.uid for line in result])
+ activity_tool.SQLBase_delMessage(table=self.sql_table,
+ uid=[line.uid for line in result])
getMessageList = SQLBase.getMessageList
@@ -489,7 +407,8 @@
validation_text_dict, now_date=now_date)
distributable_count = len(message_dict)
if distributable_count:
- activity_tool.SQLQueue_assignMessage(processing_node=0, uid=[message.uid for message in message_dict.itervalues()])
+ activity_tool.SQLBase_assignMessage(table=self.sql_table,
+ processing_node=0, uid=[m.uid for m in message_dict.itervalues()])
validated_count += distributable_count
if validated_count < MAX_VALIDATED_LIMIT:
offset += READ_MESSAGE_LIMIT
Copied: erp5/trunk/products/CMFActivity/skins/activity/SQLBase_assignMessage.zsql (from r32875, erp5/trunk/products/CMFActivity/skins/activity/SQLDict_assignMessage.zsql)
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLBase_assignMessage.zsql?p2=erp5/trunk/products/CMFActivity/skins/activity/SQLBase_assignMessage.zsql&p1=erp5/trunk/products/CMFActivity/skins/activity/SQLDict_assignMessage.zsql&r1=32875&r2=32876&rev=32876&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_assignMessage.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLBase_assignMessage.zsql [utf8] Fri Feb 19 18:55:56 2010
@@ -7,17 +7,16 @@
class_name:
class_file:
</dtml-comment>
-<params>
+<params>table
processing_node
-uid
+uid:list
</params>
-UPDATE message
+UPDATE
+ <dtml-var table>
SET
processing_node=<dtml-sqlvar processing_node type="int">,
processing=0
WHERE
- uid IN (
-<dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
-)
+ <dtml-sqltest uid type="int" multiple>
<dtml-var sql_delimiter>
COMMIT
Copied: erp5/trunk/products/CMFActivity/skins/activity/SQLBase_delMessage.zsql (from r32875, erp5/trunk/products/CMFActivity/skins/activity/SQLDict_delMessage.zsql)
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLBase_delMessage.zsql?p2=erp5/trunk/products/CMFActivity/skins/activity/SQLBase_delMessage.zsql&p1=erp5/trunk/products/CMFActivity/skins/activity/SQLDict_delMessage.zsql&r1=32875&r2=32876&rev=32876&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_delMessage.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLBase_delMessage.zsql [utf8] Fri Feb 19 18:55:56 2010
@@ -7,9 +7,10 @@
class_name:
class_file:
</dtml-comment>
-<params>uid:list</params>
+<params>table
+uid:list
+</params>
DELETE FROM
- message
+ <dtml-var table>
WHERE
-<dtml-in uid>uid = <dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else> OR </dtml-if>
-</dtml-in>
+ <dtml-sqltest uid type="int" multiple>
Copied: erp5/trunk/products/CMFActivity/skins/activity/SQLBase_reactivate.zsql (from r32875, erp5/trunk/products/CMFActivity/skins/activity/SQLDict_setPriority.zsql)
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLBase_reactivate.zsql?p2=erp5/trunk/products/CMFActivity/skins/activity/SQLBase_reactivate.zsql&p1=erp5/trunk/products/CMFActivity/skins/activity/SQLDict_setPriority.zsql&r1=32875&r2=32876&rev=32876&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_setPriority.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLBase_reactivate.zsql [utf8] Fri Feb 19 18:55:56 2010
@@ -7,25 +7,20 @@
class_name:
class_file:
</dtml-comment>
-<params>uid:list
+<params>table
+uid:list
priority
-retry
delay
</params>
UPDATE
- message
+ <dtml-var table>
SET
processing = 0
<dtml-if expr="priority is not None">
, priority = <dtml-sqlvar priority type="int">
</dtml-if>
<dtml-if expr="delay is not None">
- , date = DATE_ADD(UTC_TIMESTAMP(), INTERVAL <dtml-sqlvar delay type="int"> * (retry + 1) SECOND)
-</dtml-if>
-<dtml-if expr="retry is not None">
- , retry = retry + <dtml-sqlvar retry type="int">
+ , date = DATE_ADD(UTC_TIMESTAMP(), INTERVAL <dtml-sqlvar delay type="int"> SECOND)
</dtml-if>
WHERE
- uid IN (
- <dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
- )
+ <dtml-sqltest uid type="int" multiple>
Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_assignMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_assignMessage.zsql?rev=32875&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_assignMessage.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_assignMessage.zsql (removed)
@@ -1,23 +1,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:1
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>
-processing_node
-uid
-</params>
-UPDATE message
-SET
- processing_node=<dtml-sqlvar processing_node type="int">,
- processing=0
-WHERE
- uid IN (
-<dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
-)
-<dtml-var sql_delimiter>
-COMMIT
Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_delMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_delMessage.zsql?rev=32875&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_delMessage.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_delMessage.zsql (removed)
@@ -1,15 +1,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:1000
-max_cache:100
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>uid:list</params>
-DELETE FROM
- message
-WHERE
-<dtml-in uid>uid = <dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else> OR </dtml-if>
-</dtml-in>
Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_setPriority.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_setPriority.zsql?rev=32875&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_setPriority.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_setPriority.zsql (removed)
@@ -1,31 +1,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:1000
-max_cache:100
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>uid:list
-priority
-retry
-delay
-</params>
-UPDATE
- message
-SET
- processing = 0
-<dtml-if expr="priority is not None">
- , priority = <dtml-sqlvar priority type="int">
-</dtml-if>
-<dtml-if expr="delay is not None">
- , date = DATE_ADD(UTC_TIMESTAMP(), INTERVAL <dtml-sqlvar delay type="int"> * (retry + 1) SECOND)
-</dtml-if>
-<dtml-if expr="retry is not None">
- , retry = retry + <dtml-sqlvar retry type="int">
-</dtml-if>
-WHERE
- uid IN (
- <dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
- )
Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql?rev=32875&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql (removed)
@@ -1,22 +1,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:1
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>
-processing_node
-uid</params>
-UPDATE message_queue
-SET
- processing_node=<dtml-sqlvar processing_node type="int">,
- processing=0
-WHERE
- uid IN (
-<dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
-)
-<dtml-var sql_delimiter>
-COMMIT
Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_delMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_delMessage.zsql?rev=32875&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_delMessage.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_delMessage.zsql (removed)
@@ -1,15 +1,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:1000
-max_cache:100
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>uid</params>
-DELETE FROM
- message_queue
-WHERE
-<dtml-in uid>uid = <dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else> OR </dtml-if>
-</dtml-in>
Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_setPriority.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_setPriority.zsql?rev=32875&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_setPriority.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_setPriority.zsql (removed)
@@ -1,27 +1,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:1000
-max_cache:100
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>uid
-priority
-delay
-</params>
-UPDATE
- message_queue
-SET
- processing = 0
- <dtml-if expr="priority is not None">
- , priority = <dtml-sqlvar priority type="int">
- </dtml-if>
- <dtml-if expr="delay is not None">
- , date = DATE_ADD(UTC_TIMESTAMP(), INTERVAL <dtml-sqlvar delay type="int"> SECOND)
- </dtml-if>
-WHERE
- uid IN (
- <dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
- )
Modified: erp5/trunk/products/CMFActivity/tests/testCMFActivity.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/tests/testCMFActivity.py?rev=32876&r1=32875&r2=32876&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/tests/testCMFActivity.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/tests/testCMFActivity.py [utf8] Fri Feb 19 18:55:56 2010
@@ -35,6 +35,7 @@
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE,\
VALIDATE_ERROR_STATE
from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY
+from Products.CMFActivity.Activity.SQLDict import SQLDict
from Products.CMFActivity.Errors import ActivityPendingError, ActivityFlushError
#from Products.ERP5Type.Document.Organisation import Organisation
# The above cannot be imported at top level because it doesn't exist until
@@ -3512,7 +3513,8 @@
1)
finally:
# Clear activities from all nodes
- activity_tool.SQLDict_delMessage(uid=[message.uid for message in result])
+ activity_tool.SQLBase_delMessage(table=SQLDict.sql_table,
+ uid=[message.uid for message in result])
get_transaction().commit()
def test_116_RaiseInCommitBeforeMessageExecution(self):
More information about the Erp5-report
mailing list