[Erp5-report] r37684 jm - in /erp5/trunk/products/CMFActivity/Activity: SQLBase.py SQLDict.py
nobody at svn.erp5.org
nobody at svn.erp5.org
Wed Aug 11 10:34:32 CEST 2010
Author: jm
Date: Wed Aug 11 10:34:31 2010
New Revision: 37684
URL: http://svn.erp5.org?rev=37684&view=rev
Log:
CMFActivity refactoring: move code from SQLDict to SQLBase
Modified:
erp5/trunk/products/CMFActivity/Activity/SQLBase.py
erp5/trunk/products/CMFActivity/Activity/SQLDict.py
Modified: erp5/trunk/products/CMFActivity/Activity/SQLBase.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLBase.py?rev=37684&r1=37683&r2=37684&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLBase.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLBase.py [utf8] Wed Aug 11 10:34:31 2010
@@ -27,14 +27,20 @@
##############################################################################
import sys
+import transaction
from zLOG import LOG, TRACE, INFO, WARNING, ERROR
from ZODB.POSException import ConflictError
from Products.CMFActivity.ActivityTool import (
MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED)
from Products.CMFActivity.ActiveObject import (
INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE)
+from Products.CMFActivity.ActivityRuntimeEnvironment import (
+ ActivityRuntimeEnvironment, getTransactionalVariable)
from Queue import VALIDATION_ERROR_DELAY
+# Stop electing more messages for processing if more than this number of
+# objects are impacted by elected messages.
+MAX_GROUPED_OBJECTS = 100
def sort_message_key(message):
# same sort key as in SQL{Dict,Queue}_readMessageList
@@ -129,6 +135,230 @@ class SQLBase:
LOG(self.__class__.__name__, severity, summary,
error=severity>INFO and sys.exc_info() or None)
+ def getReservedMessageList(self, activity_tool, date, processing_node, limit=None, group_method_id=None):
+ """
+ Get and reserve a list of messages.
+ limit
+ Maximum number of messages to fetch.
+ This number is not garanted to be reached, because of:
+ - not enough messages being pending execution
+ - race condition (other nodes reserving the same messages at the same
+ time)
+ This number is guaranted not to be exceeded.
+ If None (or not given) no limit apply.
+ """
+ result = not group_method_id and \
+ activity_tool.SQLDict_selectReservedMessageList(
+ processing_node=processing_node, count=limit)
+ if not result:
+ activity_tool.SQLDict_reserveMessageList(count=limit, processing_node=processing_node, to_date=date, group_method_id=group_method_id)
+ result = activity_tool.SQLDict_selectReservedMessageList(processing_node=processing_node, count=limit)
+ return result
+
+ def makeMessageListAvailable(self, activity_tool, uid_list):
+ """
+ Put messages back in processing_node=0 .
+ """
+ if len(uid_list):
+ activity_tool.SQLDict_makeMessageListAvailable(uid_list=uid_list)
+
+ def getProcessableMessageList(self, activity_tool, processing_node):
+ """
+ Always true:
+ For each reserved message, delete redundant messages when it gets
+ reserved (definitely lost, but they are expandable since redundant).
+
+ - reserve a message
+ - set reserved message to processing=1 state
+ - if this message has a group_method_id:
+ - reserve a bunch of BUNDLE_MESSAGE_COUNT messages
+ - untill number of impacted objects goes over MAX_GROUPED_OBJECTS
+ - get one message from the reserved bunch (this messages will be
+ "needed")
+ - increase the number of impacted object
+ - set "needed" reserved messages to processing=1 state
+ - unreserve "unneeded" messages
+ - return still-reserved message list and a group_method_id
+
+ If any error happens in above described process, try to unreserve all
+ messages already reserved in that process.
+ If it fails, complain loudly that some messages might still be in an
+ unclean state.
+
+ Returned values:
+ 4-tuple:
+ - list of messages
+ - impacted object count
+ - group_method_id
+ - uid_to_duplicate_uid_list_dict
+ """
+ def getReservedMessageList(limit, group_method_id=None):
+ line_list = self.getReservedMessageList(activity_tool=activity_tool,
+ date=now_date,
+ processing_node=processing_node,
+ limit=limit,
+ group_method_id=group_method_id)
+ if len(line_list):
+ LOG('SQLDict', TRACE, 'Reserved messages: %r' % ([x.uid for x in line_list]))
+ return line_list
+ def getDuplicateMessageUidList(line):
+ uid_list = self.getDuplicateMessageUidList(activity_tool=activity_tool,
+ line=line, processing_node=processing_node)
+ if len(uid_list):
+ LOG('SQLDict', TRACE, 'Reserved duplicate messages: %r' % (uid_list, ))
+ return uid_list
+ def makeMessageListAvailable(uid_list):
+ self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
+ BUNDLE_MESSAGE_COUNT = 100 # Arbitrary number
+ now_date = self.getNow(activity_tool)
+ message_list = []
+ count = 0
+ group_method_id = None
+ try:
+ result = getReservedMessageList(limit=1)
+ uid_to_duplicate_uid_list_dict = {}
+ if len(result) > 0:
+ line = result[0]
+ uid = line.uid
+ m = self.loadMessage(line.message, uid=uid, line=line)
+ message_list.append(m)
+ group_method_id = line.group_method_id
+ activity_tool.SQLDict_processMessage(uid=[uid])
+ uid_to_duplicate_uid_list_dict.setdefault(uid, []) \
+ .extend(getDuplicateMessageUidList(line))
+ if group_method_id not in (None, '', '\0'):
+ # Count the number of objects to prevent too many objects.
+ count += len(m.getObjectList(activity_tool))
+ if count < MAX_GROUPED_OBJECTS:
+ # Retrieve objects which have the same group method.
+ result = getReservedMessageList(limit=BUNDLE_MESSAGE_COUNT, group_method_id=group_method_id)
+ path_and_method_id_dict = {}
+ unreserve_uid_list = []
+ for line in result:
+ if line.uid == uid:
+ continue
+ # All fetched lines have the same group_method_id and
+ # processing_node.
+ # Their dates are lower-than or equal-to now_date.
+ # We read each line once so lines have distinct uids.
+ # So what remains to be filtered on are path, method_id and
+ # order_validation_text.
+ key = (line.path, line.method_id, line.order_validation_text)
+ original_uid = path_and_method_id_dict.get(key)
+ if original_uid is not None:
+ uid_to_duplicate_uid_list_dict.setdefault(original_uid, []).append(line.uid)
+ continue
+ path_and_method_id_dict[key] = line.uid
+ uid_to_duplicate_uid_list_dict.setdefault(line.uid, []).extend(getDuplicateMessageUidList(line))
+ if count < MAX_GROUPED_OBJECTS:
+ m = self.loadMessage(line.message, uid=line.uid, line=line)
+ count += len(m.getObjectList(activity_tool))
+ message_list.append(m)
+ else:
+ unreserve_uid_list.append(line.uid)
+ activity_tool.SQLDict_processMessage(uid=[m.uid for m in message_list])
+ # Unreserve extra messages as soon as possible.
+ makeMessageListAvailable(unreserve_uid_list)
+ return message_list, count, group_method_id, uid_to_duplicate_uid_list_dict
+ except:
+ LOG('SQLDict', WARNING, 'Exception while reserving messages.', error=sys.exc_info())
+ if len(message_list):
+ to_free_uid_list = [m.uid for m in message_list]
+ try:
+ makeMessageListAvailable(to_free_uid_list)
+ except:
+ LOG('SQLDict', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
+ else:
+ if len(to_free_uid_list):
+ LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
+ else:
+ LOG('SQLDict', TRACE, '(no message was reserved)')
+ return [], 0, None, {}
+
+ # Queue semantic
+ def dequeueMessage(self, activity_tool, processing_node):
+ def makeMessageListAvailable(uid_list, uid_to_duplicate_uid_list_dict):
+ final_uid_list = []
+ for uid in uid_list:
+ final_uid_list.append(uid)
+ final_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
+ self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=final_uid_list)
+ message_list, count, group_method_id, uid_to_duplicate_uid_list_dict = \
+ self.getProcessableMessageList(activity_tool, processing_node)
+ if message_list:
+ # Remove group_id parameter from group_method_id
+ if group_method_id is not None:
+ group_method_id = group_method_id.split('\0')[0]
+ if group_method_id not in (None, ""):
+ method = activity_tool.invokeGroup
+ args = (group_method_id, message_list)
+ activity_runtime_environment = ActivityRuntimeEnvironment(None)
+ else:
+ method = activity_tool.invoke
+ message = message_list[0]
+ args = (message, )
+ activity_runtime_environment = ActivityRuntimeEnvironment(message)
+ # Commit right before executing messages.
+ # As MySQL transaction does not start exactly at the same time as ZODB
+ # transactions but a bit later, messages available might be called
+ # on objects which are not available - or available in an old
+ # version - to ZODB connector.
+ # So all connectors must be committed now that we have selected
+ # everything needed from MySQL to get a fresh view of ZODB objects.
+ transaction.commit()
+ tv = getTransactionalVariable(None)
+ tv['activity_runtime_environment'] = activity_runtime_environment
+ # Try to invoke
+ try:
+ method(*args)
+ except:
+ LOG('SQLDict', WARNING, 'Exception raised when invoking messages (uid, path, method_id) %r' % ([(m.uid, m.object_path, m.method_id) for m in message_list], ), error=sys.exc_info())
+ try:
+ transaction.abort()
+ except:
+ # Unfortunately, database adapters may raise an exception against abort.
+ LOG('SQLDict', PANIC,
+ 'abort failed, thus some objects may be modified accidentally')
+ raise
+ # XXX Is it still useful to free messages now that this node is able
+ # to reselect them ?
+ to_free_uid_list = [x.uid for x in message_list]
+ try:
+ makeMessageListAvailable(to_free_uid_list, uid_to_duplicate_uid_list_dict)
+ except:
+ LOG('SQLDict', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
+ else:
+ LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list))
+ # Abort if something failed.
+ if [m for m in message_list if m.getExecutionState() == MESSAGE_NOT_EXECUTED]:
+ endTransaction = transaction.abort
+ else:
+ endTransaction = transaction.commit
+ try:
+ endTransaction()
+ except:
+ LOG('SQLDict', WARNING, 'Failed to end transaction for messages (uid, path, method_id) %r' % ([(m.uid, m.object_path, m.method_id) for m in message_list], ), error=sys.exc_info())
+ if endTransaction == transaction.abort:
+ LOG('SQLDict', PANIC, 'Failed to abort executed messages. Some objects may be modified accidentally.')
+ else:
+ try:
+ transaction.abort()
+ except:
+ LOG('SQLDict', PANIC, 'Failed to abort executed messages which also failed to commit. Some objects may be modified accidentally.')
+ raise
+ exc_info = sys.exc_info()
+ for m in message_list:
+ m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info=exc_info, log=False)
+ try:
+ makeMessageListAvailable([x.uid for x in message_list], uid_to_duplicate_uid_list_dict)
+ except:
+ LOG('SQLDict', ERROR, 'Failed to free remaining messages: %r' % (message_list, ), error=sys.exc_info())
+ else:
+ LOG('SQLDict', TRACE, 'Freed messages %r' % (message_list, ))
+ self.finalizeMessageExecution(activity_tool, message_list, uid_to_duplicate_uid_list_dict)
+ transaction.commit()
+ return not message_list
+
def finalizeMessageExecution(self, activity_tool, message_list,
uid_to_duplicate_uid_list_dict=None):
"""
Modified: erp5/trunk/products/CMFActivity/Activity/SQLDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLDict.py?rev=37684&r1=37683&r2=37684&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLDict.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLDict.py [utf8] Wed Aug 11 10:34:31 2010
@@ -29,16 +29,10 @@
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
from Queue import VALID, INVALID_PATH
from RAMDict import RAMDict
-from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from Products.CMFActivity.Errors import ActivityFlushError
-from ZODB.POSException import ConflictError
import sys
-from types import ClassType
#from time import time
from SQLBase import SQLBase, sort_message_key
-from Products.CMFActivity.ActivityRuntimeEnvironment import (
- ActivityRuntimeEnvironment, getTransactionalVariable)
-from zExceptions import ExceptionFormatter
import transaction
@@ -48,9 +42,6 @@ from zLOG import LOG, TRACE, WARNING, ER
MAX_VALIDATED_LIMIT = 1000
# Read up to this number of messages to validate.
READ_MESSAGE_LIMIT = 1000
-# Stop electing more messages for processing if more than this number of
-# objects are impacted by elected messages.
-MAX_GROUPED_OBJECTS = 100
MAX_MESSAGE_LIST_SIZE = 100
@@ -131,33 +122,6 @@ class SQLDict(RAMDict, SQLBase):
message_list = activity_buffer.getMessageList(self)
return [m for m in message_list if m.is_registered]
- def getReservedMessageList(self, activity_tool, date, processing_node, limit=None, group_method_id=None):
- """
- Get and reserve a list of messages.
- limit
- Maximum number of messages to fetch.
- This number is not garanted to be reached, because of:
- - not enough messages being pending execution
- - race condition (other nodes reserving the same messages at the same
- time)
- This number is guaranted not to be exceeded.
- If None (or not given) no limit apply.
- """
- result = not group_method_id and \
- activity_tool.SQLDict_selectReservedMessageList(
- processing_node=processing_node, count=limit)
- if not result:
- activity_tool.SQLDict_reserveMessageList(count=limit, processing_node=processing_node, to_date=date, group_method_id=group_method_id)
- result = activity_tool.SQLDict_selectReservedMessageList(processing_node=processing_node, count=limit)
- return result
-
- def makeMessageListAvailable(self, activity_tool, uid_list):
- """
- Put messages back in processing_node=0 .
- """
- if len(uid_list):
- activity_tool.SQLDict_makeMessageListAvailable(uid_list=uid_list)
-
def getDuplicateMessageUidList(self, activity_tool, line, processing_node):
"""
Reserve unreserved messages matching given line.
@@ -188,202 +152,7 @@ class SQLDict(RAMDict, SQLBase):
raise
return uid_list
- def getProcessableMessageList(self, activity_tool, processing_node):
- """
- Always true:
- For each reserved message, delete redundant messages when it gets
- reserved (definitely lost, but they are expandable since redundant).
-
- - reserve a message
- - set reserved message to processing=1 state
- - if this message has a group_method_id:
- - reserve a bunch of BUNDLE_MESSAGE_COUNT messages
- - untill number of impacted objects goes over MAX_GROUPED_OBJECTS
- - get one message from the reserved bunch (this messages will be
- "needed")
- - increase the number of impacted object
- - set "needed" reserved messages to processing=1 state
- - unreserve "unneeded" messages
- - return still-reserved message list and a group_method_id
-
- If any error happens in above described process, try to unreserve all
- messages already reserved in that process.
- If it fails, complain loudly that some messages might still be in an
- unclean state.
-
- Returned values:
- 4-tuple:
- - list of messages
- - impacted object count
- - group_method_id
- - uid_to_duplicate_uid_list_dict
- """
- def getReservedMessageList(limit, group_method_id=None):
- line_list = self.getReservedMessageList(activity_tool=activity_tool,
- date=now_date,
- processing_node=processing_node,
- limit=limit,
- group_method_id=group_method_id)
- if len(line_list):
- LOG('SQLDict', TRACE, 'Reserved messages: %r' % ([x.uid for x in line_list]))
- return line_list
- def getDuplicateMessageUidList(line):
- uid_list = self.getDuplicateMessageUidList(activity_tool=activity_tool,
- line=line, processing_node=processing_node)
- if len(uid_list):
- LOG('SQLDict', TRACE, 'Reserved duplicate messages: %r' % (uid_list, ))
- return uid_list
- def makeMessageListAvailable(uid_list):
- self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
- BUNDLE_MESSAGE_COUNT = 100 # Arbitrary number
- now_date = self.getNow(activity_tool)
- message_list = []
- count = 0
- group_method_id = None
- try:
- result = getReservedMessageList(limit=1)
- uid_to_duplicate_uid_list_dict = {}
- if len(result) > 0:
- line = result[0]
- uid = line.uid
- m = self.loadMessage(line.message, uid=uid, line=line)
- message_list.append(m)
- group_method_id = line.group_method_id
- activity_tool.SQLDict_processMessage(uid=[uid])
- uid_to_duplicate_uid_list_dict.setdefault(uid, []) \
- .extend(getDuplicateMessageUidList(line))
- if group_method_id not in (None, '', '\0'):
- # Count the number of objects to prevent too many objects.
- count += len(m.getObjectList(activity_tool))
- if count < MAX_GROUPED_OBJECTS:
- # Retrieve objects which have the same group method.
- result = getReservedMessageList(limit=BUNDLE_MESSAGE_COUNT, group_method_id=group_method_id)
- path_and_method_id_dict = {}
- unreserve_uid_list = []
- for line in result:
- if line.uid == uid:
- continue
- # All fetched lines have the same group_method_id and
- # processing_node.
- # Their dates are lower-than or equal-to now_date.
- # We read each line once so lines have distinct uids.
- # So what remains to be filtered on are path, method_id and
- # order_validation_text.
- key = (line.path, line.method_id, line.order_validation_text)
- original_uid = path_and_method_id_dict.get(key)
- if original_uid is not None:
- uid_to_duplicate_uid_list_dict.setdefault(original_uid, []).append(line.uid)
- continue
- path_and_method_id_dict[key] = line.uid
- uid_to_duplicate_uid_list_dict.setdefault(line.uid, []).extend(getDuplicateMessageUidList(line))
- if count < MAX_GROUPED_OBJECTS:
- m = self.loadMessage(line.message, uid=line.uid, line=line)
- count += len(m.getObjectList(activity_tool))
- message_list.append(m)
- else:
- unreserve_uid_list.append(line.uid)
- activity_tool.SQLDict_processMessage(uid=[m.uid for m in message_list])
- # Unreserve extra messages as soon as possible.
- makeMessageListAvailable(unreserve_uid_list)
- return message_list, count, group_method_id, uid_to_duplicate_uid_list_dict
- except:
- LOG('SQLDict', WARNING, 'Exception while reserving messages.', error=sys.exc_info())
- if len(message_list):
- to_free_uid_list = [m.uid for m in message_list]
- try:
- makeMessageListAvailable(to_free_uid_list)
- except:
- LOG('SQLDict', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
- else:
- if len(to_free_uid_list):
- LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
- else:
- LOG('SQLDict', TRACE, '(no message was reserved)')
- return [], 0, None, {}
-
- # Queue semantic
- def dequeueMessage(self, activity_tool, processing_node):
- def makeMessageListAvailable(uid_list, uid_to_duplicate_uid_list_dict):
- final_uid_list = []
- for uid in uid_list:
- final_uid_list.append(uid)
- final_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
- self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=final_uid_list)
- message_list, count, group_method_id, uid_to_duplicate_uid_list_dict = \
- self.getProcessableMessageList(activity_tool, processing_node)
- if message_list:
- # Remove group_id parameter from group_method_id
- if group_method_id is not None:
- group_method_id = group_method_id.split('\0')[0]
- if group_method_id not in (None, ""):
- method = activity_tool.invokeGroup
- args = (group_method_id, message_list)
- activity_runtime_environment = ActivityRuntimeEnvironment(None)
- else:
- method = activity_tool.invoke
- message = message_list[0]
- args = (message, )
- activity_runtime_environment = ActivityRuntimeEnvironment(message)
- # Commit right before executing messages.
- # As MySQL transaction does not start exactly at the same time as ZODB
- # transactions but a bit later, messages available might be called
- # on objects which are not available - or available in an old
- # version - to ZODB connector.
- # So all connectors must be committed now that we have selected
- # everything needed from MySQL to get a fresh view of ZODB objects.
- transaction.commit()
- tv = getTransactionalVariable(None)
- tv['activity_runtime_environment'] = activity_runtime_environment
- # Try to invoke
- try:
- method(*args)
- except:
- LOG('SQLDict', WARNING, 'Exception raised when invoking messages (uid, path, method_id) %r' % ([(m.uid, m.object_path, m.method_id) for m in message_list], ), error=sys.exc_info())
- try:
- transaction.abort()
- except:
- # Unfortunately, database adapters may raise an exception against abort.
- LOG('SQLDict', PANIC,
- 'abort failed, thus some objects may be modified accidentally')
- raise
- # XXX Is it still useful to free messages now that this node is able
- # to reselect them ?
- to_free_uid_list = [x.uid for x in message_list]
- try:
- makeMessageListAvailable(to_free_uid_list, uid_to_duplicate_uid_list_dict)
- except:
- LOG('SQLDict', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
- else:
- LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list))
- # Abort if something failed.
- if [m for m in message_list if m.getExecutionState() == MESSAGE_NOT_EXECUTED]:
- endTransaction = transaction.abort
- else:
- endTransaction = transaction.commit
- try:
- endTransaction()
- except:
- LOG('SQLDict', WARNING, 'Failed to end transaction for messages (uid, path, method_id) %r' % ([(m.uid, m.object_path, m.method_id) for m in message_list], ), error=sys.exc_info())
- if endTransaction == transaction.abort:
- LOG('SQLDict', PANIC, 'Failed to abort executed messages. Some objects may be modified accidentally.')
- else:
- try:
- transaction.abort()
- except:
- LOG('SQLDict', PANIC, 'Failed to abort executed messages which also failed to commit. Some objects may be modified accidentally.')
- raise
- exc_info = sys.exc_info()
- for m in message_list:
- m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info=exc_info, log=False)
- try:
- makeMessageListAvailable([x.uid for x in message_list], uid_to_duplicate_uid_list_dict)
- except:
- LOG('SQLDict', ERROR, 'Failed to free remaining messages: %r' % (message_list, ), error=sys.exc_info())
- else:
- LOG('SQLDict', TRACE, 'Freed messages %r' % (message_list, ))
- self.finalizeMessageExecution(activity_tool, message_list, uid_to_duplicate_uid_list_dict)
- transaction.commit()
- return not message_list
+ dequeueMessage = SQLBase.dequeueMessage
def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
hasMessage = getattr(activity_tool, 'SQLDict_hasMessage', None)
More information about the Erp5-report
mailing list