[Erp5-report] r32875 jm - in /erp5/trunk/products/CMFActivity/Activity: SQLDict.py SQLQueue.py

nobody at svn.erp5.org nobody at svn.erp5.org
Fri Feb 19 18:55:40 CET 2010


Author: jm
Date: Fri Feb 19 18:55:40 2010
New Revision: 32875

URL: http://svn.erp5.org?rev=32875&view=rev
Log:
Some cleanup in SQLDict and SQLQueue

Pass list of Message objects instead of a list of (uid, message, priority).
Store the fetched line on the Message object to retrieve the priority.

In the future, if we allow executed activity to access its related Message
object, it could also get the SQL line.

Modified:
    erp5/trunk/products/CMFActivity/Activity/SQLDict.py
    erp5/trunk/products/CMFActivity/Activity/SQLQueue.py

Modified: erp5/trunk/products/CMFActivity/Activity/SQLDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLDict.py?rev=32875&r1=32874&r2=32875&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLDict.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLDict.py [utf8] Fri Feb 19 18:55:40 2010
@@ -233,10 +233,7 @@
 
       Returned values:
         4-tuple:
-          - list of 3-tuple:
-            - message uid
-            - message
-            - priority
+          - list of messages
           - impacted object count
           - group_method_id
           - uid_to_duplicate_uid_list_dict
@@ -261,8 +258,6 @@
     BUNDLE_MESSAGE_COUNT = 100 # Arbitrary number
     now_date = self.getNow(activity_tool)
     message_list = []
-    def append(line, message):
-      message_list.append((line.uid, message, line.priority))
     count = 0
     group_method_id = None
     try:
@@ -271,8 +266,8 @@
       if len(result) > 0:
         line = result[0]
         uid = line.uid
-        m = self.loadMessage(line.message, uid=uid)
-        append(line, m)
+        m = self.loadMessage(line.message, uid=uid, line=line)
+        message_list.append(m)
         group_method_id = line.group_method_id
         activity_tool.SQLDict_processMessage(uid=[uid])
         uid_to_duplicate_uid_list_dict.setdefault(uid, []) \
@@ -302,19 +297,19 @@
               path_and_method_id_dict[key] = line.uid
               uid_to_duplicate_uid_list_dict.setdefault(line.uid, []).extend(getDuplicateMessageUidList(line))
               if count < MAX_GROUPED_OBJECTS:
-                m = self.loadMessage(line.message, uid=line.uid)
+                m = self.loadMessage(line.message, uid=line.uid, line=line)
                 count += len(m.getObjectList(activity_tool))
-                append(line, m)
+                message_list.append(m)
               else:
                 unreserve_uid_list.append(line.uid)
-            activity_tool.SQLDict_processMessage(uid=[x[0] for x in message_list])
+            activity_tool.SQLDict_processMessage(uid=[m.uid for m in message_list])
             # Unreserve extra messages as soon as possible.
             makeMessageListAvailable(unreserve_uid_list)
       return message_list, count, group_method_id, uid_to_duplicate_uid_list_dict
     except:
       LOG('SQLDict', WARNING, 'Exception while reserving messages.', error=sys.exc_info())
       if len(message_list):
-        to_free_uid_list = [x[0] for x in message_list]
+        to_free_uid_list = [m.uid for m in message_list]
         try:
           makeMessageListAvailable(to_free_uid_list)
         except:
@@ -326,7 +321,7 @@
         LOG('SQLDict', TRACE, '(no message was reserved)')
       return [], 0, None, {}
 
-  def finalizeMessageExecution(self, activity_tool, message_uid_priority_list, uid_to_duplicate_uid_list_dict):
+  def finalizeMessageExecution(self, activity_tool, message_list, uid_to_duplicate_uid_list_dict):
     """
       If everything was fine, delete all messages.
       If anything failed, make successful messages available (if any), and
@@ -345,8 +340,9 @@
     make_available_uid_list = []
     notify_user_list = []
     non_executable_message_list = []
-    something_failed = (len([x for x in message_uid_priority_list if x[1].getExecutionState() == MESSAGE_NOT_EXECUTED]) != 0)
-    for uid, m, priority in message_uid_priority_list:
+    something_failed = (len([m for m in message_list if m.getExecutionState() == MESSAGE_NOT_EXECUTED]) != 0)
+    for m in message_list:
+      uid = m.uid
       if m.getExecutionState() == MESSAGE_EXECUTED:
         if something_failed:
           make_available_uid_list.append(uid)
@@ -358,6 +354,7 @@
         # Should duplicate messages follow strictly the original message, or
         # should they be just made available again ?
         make_available_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
+        priority = m.line.priority
         # BACK: Only exceptions can be classes in Python 2.6.
         # Once we drop support for Python 2.4, 
         # please, remove the "type(m.exc_type) is type(ConflictError)" check
@@ -431,14 +428,12 @@
         final_uid_list.append(uid)
         final_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
       self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=final_uid_list)
-    message_uid_priority_list, count, group_method_id, \
-      uid_to_duplicate_uid_list_dict = \
+    message_list, count, group_method_id, uid_to_duplicate_uid_list_dict = \
       self.getProcessableMessageList(activity_tool, processing_node)
-    if len(message_uid_priority_list):
+    if message_list:
       # Remove group_id parameter from group_method_id
       if group_method_id is not None:
         group_method_id = group_method_id.split('\0')[0]
-      message_list = [x[1] for x in message_uid_priority_list]
       clearActivityRuntimeEnvironment()
       if group_method_id not in (None, ""):
         setActivityRuntimeValue('group_method_id', group_method_id)
@@ -449,8 +444,8 @@
         message = message_list[0]
         args = (message, )
         updateActivityRuntimeValue({'activity_kw': message.activity_kw,
-                                    'priority': message_uid_priority_list[0][2],
-                                    'uid': message_uid_priority_list[0][0]})
+                                    'priority': message.line.priority,
+                                    'uid': message.uid})
       setActivityRuntimeValue('processing_node', processing_node)
       # Commit right before executing messages.
       # As MySQL transaction does not start exactly at the same time as ZODB
@@ -464,7 +459,7 @@
       try:
         method(*args)
       except:
-        LOG('SQLDict', WARNING, 'Exception raised when invoking messages (uid, path, method_id) %r' % ([(x[0], x[1].object_path, x[1].method_id) for x in message_uid_priority_list], ), error=sys.exc_info())
+        LOG('SQLDict', WARNING, 'Exception raised when invoking messages (uid, path, method_id) %r' % ([(m.uid, m.object_path, m.method_id) for m in message_list], ), error=sys.exc_info())
         try:
           abortTransactionSynchronously()
         except:
@@ -474,7 +469,7 @@
           raise
         # XXX Is it still useful to free messages now that this node is able
         #     to reselect them ?
-        to_free_uid_list = [x[0] for x in message_uid_priority_list]
+        to_free_uid_list = [x.uid for x in message_list]
         try:
           makeMessageListAvailable(to_free_uid_list, uid_to_duplicate_uid_list_dict)
         except:
@@ -482,14 +477,14 @@
         else:
           LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list))
       # Abort if something failed.
-      if len([x for x in message_uid_priority_list if x[1].getExecutionState() == MESSAGE_NOT_EXECUTED]) != 0:
+      if [m for m in message_list if m.getExecutionState() == MESSAGE_NOT_EXECUTED]:
         endTransaction = abortTransactionSynchronously
       else:
         endTransaction = get_transaction().commit
       try:
         endTransaction()
       except:
-        LOG('SQLDict', WARNING, 'Failed to end transaction for messages (uid, path, method_id) %r' % ([(x[0], x[1].object_path, x[1].method_id) for x in message_uid_priority_list], ), error=sys.exc_info())
+        LOG('SQLDict', WARNING, 'Failed to end transaction for messages (uid, path, method_id) %r' % ([(m.uid, m.object_path, m.method_id) for m in message_list], ), error=sys.exc_info())
         if endTransaction == abortTransactionSynchronously:
           LOG('SQLDict', PANIC, 'Failed to abort executed messages. Some objects may be modified accidentally.')
         else:
@@ -499,18 +494,17 @@
             LOG('SQLDict', PANIC, 'Failed to abort executed messages which also failed to commit. Some objects may be modified accidentally.')
             raise
         exc_info = sys.exc_info()
-        for x in message_uid_priority_list:
-          x[1].setExecutionState(MESSAGE_NOT_EXECUTED, exc_info=exc_info, log=False)
-        failed_message_uid_list = [x[0] for x in message_uid_priority_list]
+        for m in message_list:
+          m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info=exc_info, log=False)
         try:
-          makeMessageListAvailable(failed_message_uid_list, uid_to_duplicate_uid_list_dict)
+          makeMessageListAvailable(message_list, uid_to_duplicate_uid_list_dict)
         except:
-          LOG('SQLDict', ERROR, 'Failed to free remaining messages: %r' % (failed_message_uid_list, ), error=sys.exc_info())
+          LOG('SQLDict', ERROR, 'Failed to free remaining messages: %r' % (message_list, ), error=sys.exc_info())
         else:
-          LOG('SQLDict', TRACE, 'Freed messages %r' % (failed_message_uid_list, ))
-      self.finalizeMessageExecution(activity_tool, message_uid_priority_list, uid_to_duplicate_uid_list_dict)
+          LOG('SQLDict', TRACE, 'Freed messages %r' % (message_list, ))
+      self.finalizeMessageExecution(activity_tool, message_list, uid_to_duplicate_uid_list_dict)
     get_transaction().commit()
-    return not len(message_uid_priority_list)
+    return not message_list
 
   def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
     hasMessage = getattr(activity_tool, 'SQLDict_hasMessage', None)
@@ -575,7 +569,7 @@
           # This is optimisation with the goal to process objects on the same
           # node and minimize network traffic with ZEO server
           method_dict[line_method_id] = 1
-          m = self.loadMessage(line.message, uid = line.uid)
+          m = self.loadMessage(line.message, uid=line.uid, line=line)
           if invoke:
             # First Validate (only if message is marked as new)
             if line.processing_node == -1:
@@ -612,7 +606,7 @@
     if dumpMessageList is not None:
       result = dumpMessageList()
       for line in result:
-        m = self.loadMessage(line.message, uid = line.uid)
+        m = self.loadMessage(line.message, uid=line.uid, line=line)
         message_list.append(m)
     return message_list
 

Modified: erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLQueue.py?rev=32875&r1=32874&r2=32875&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLQueue.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLQueue.py [utf8] Fri Feb 19 18:55:40 2010
@@ -162,10 +162,7 @@
       unclean state.
 
       Returned values:
-        list of 3-tuple:
-          - message uid
-          - message
-          - priority
+        list of messages
     """
     def getReservedMessageList(limit):
       line_list = self.getReservedMessageList(activity_tool=activity_tool,
@@ -179,21 +176,18 @@
       self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
     now_date = self.getNow(activity_tool)
     message_list = []
-    def append(line, message):
-      uid = line.uid
-      message_list.append((uid, message, line.priority))
     try:
       result = getReservedMessageList(limit=MESSAGE_BUNDLE_SIZE)
       for line in result:
-        m = self.loadMessage(line.message, uid=line.uid)
-        append(line, m)
+        m = self.loadMessage(line.message, uid=line.uid, line=line)
+        message_list.append(m)
       if len(message_list):
-        activity_tool.SQLQueue_processMessage(uid=[x[0] for x in message_list])
+        activity_tool.SQLQueue_processMessage(uid=[m.uid for x in message_list])
       return message_list
     except:
       LOG('SQLQueue', WARNING, 'Exception while reserving messages.', error=sys.exc_info())
       if len(message_list):
-        to_free_uid_list = [x[0] for x in message_list]
+        to_free_uid_list = [m.uid for m in message_list]
         try:
           makeMessageListAvailable(to_free_uid_list)
         except:
@@ -205,7 +199,7 @@
         LOG('SQLQueue', TRACE, '(no message was reserved)')
       return []
 
-  def finalizeMessageExecution(self, activity_tool, message_uid_priority_list):
+  def finalizeMessageExecution(self, activity_tool, message_list):
     def makeMessageListAvailable(uid_list):
       self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
     deletable_uid_list = []
@@ -213,10 +207,12 @@
     final_error_uid_list = []
     notify_user_list = []
     non_executable_message_list = []
-    for uid, m, priority in message_uid_priority_list:
+    for m in message_list:
+      uid = m.uid
       if m.getExecutionState() == MESSAGE_EXECUTED:
         deletable_uid_list.append(uid)
       elif m.getExecutionState() == MESSAGE_NOT_EXECUTED:
+        priority = m.line.priority
         # BACK: Only exceptions can be classes in Python 2.6.
         # Once we drop support for Python 2.4, 
         # please, remove the "type(m.exc_type) is type(ConflictError)" check
@@ -288,11 +284,11 @@
   def dequeueMessage(self, activity_tool, processing_node):
     def makeMessageListAvailable(uid_list):
       self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
-    message_uid_priority_list = \
+    message_list = \
       self.getProcessableMessageList(activity_tool, processing_node)
-    if len(message_uid_priority_list):
+    if message_list:
       processing_stop_time = time() + 30 # Stop processing after more than 10 seconds were spent
-      processed_message_uid_list = []
+      processed_count = 0
       # Commit right before executing messages.
       # As MySQL transaction does not start exactly at the same time as ZODB
       # transactions but a bit later, messages available might be called
@@ -301,17 +297,17 @@
       # So all connectors must be committed now that we have selected
       # everything needed from MySQL to get a fresh view of ZODB objects.
       get_transaction().commit()
-      for value in message_uid_priority_list:
+      for m in message_list:
+        processed_count += 1
         clearActivityRuntimeEnvironment()
         updateActivityRuntimeValue({'processing_node': processing_node,
-                                    'activity_kw': value[1].activity_kw,
-                                    'priority': value[2],
-                                    'uid': value[0]})
-        processed_message_uid_list.append(value)
+                                    'activity_kw': m.activity_kw,
+                                    'priority': m.line.priority,
+                                    'uid': m.uid})
         # Try to invoke
         try:
-          activity_tool.invoke(value[1])
-          if value[1].getExecutionState() != MESSAGE_NOT_EXECUTED:
+          activity_tool.invoke(m)
+          if m.getExecutionState() != MESSAGE_NOT_EXECUTED:
             # Commit so that if a message raises it doesn't causes previous
             # successfull messages to be rolled back. This commit might fail,
             # so it is protected the same way as activity execution by the
@@ -321,7 +317,8 @@
             # This message failed, revert.
             abortTransactionSynchronously()
         except:
-          LOG('SQLQueue', WARNING, 'Exception raised when invoking message (uid, path, method_id) %r' % ((value[0], value[1].object_path, value[1].method_id), ), error=sys.exc_info())
+          value = m.uid, m.object_path, m.method_id
+          LOG('SQLQueue', WARNING, 'Exception raised when invoking message (uid, path, method_id) %r' % (value, ), error=sys.exc_info())
           try:
             abortTransactionSynchronously()
           except:
@@ -331,11 +328,11 @@
           # We must make sure that the message is not set as executed.
           # It is possible that the message is executed but the commit
           # of the transaction fails
-          value[1].setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
+          m.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
           # XXX Is it still useful to free message now that this node is able
           #     to reselect it ?
           try:
-            makeMessageListAvailable([value[0]])
+            makeMessageListAvailable([m.uid])
           except:
             LOG('SQLQueue', ERROR, 'Failed to free message: %r' % (value, ), error=sys.exc_info())
           else:
@@ -344,18 +341,18 @@
           LOG('SQLQueue', TRACE, 'Stop processing message batch because processing delay exceeded')
           break
       # Release all unprocessed messages
-      processed_uid_set = ImmutableSet([x[0] for x in processed_message_uid_list])
-      to_free_uid_list = [x[0] for x in message_uid_priority_list if x[0] not in processed_uid_set]
-      if len(to_free_uid_list):
+      to_free_uid_list = [m.uid for m in message_list[processed_count:]]
+      if to_free_uid_list:
         try:
           makeMessageListAvailable(to_free_uid_list)
         except:
           LOG('SQLQueue', ERROR, 'Failed to free remaining messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
         else:
           LOG('SQLQueue', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
-      self.finalizeMessageExecution(activity_tool, processed_message_uid_list)
+      self.finalizeMessageExecution(activity_tool,
+                                    message_list[:processed_count])
     get_transaction().commit()
-    return not len(message_uid_priority_list)
+    return not message_list
 
 
   def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
@@ -411,7 +408,7 @@
       for line in result:
         path = line.path
         method_id = line.method_id
-        m = self.loadMessage(line.message, uid = line.uid)
+        m = self.loadMessage(line.message, uid=line.uid, line=line)
         if invoke:
           # First Validate (only if message is marked as new)
           if line.processing_node == -1:
@@ -467,7 +464,7 @@
     if dumpMessageList is not None:
       result = dumpMessageList()
       for line in result:
-        m = self.loadMessage(line.message, uid = line.uid)
+        m = self.loadMessage(line.message, uid=line.uid, line=line)
         message_list.append(m)
     return message_list
 




More information about the Erp5-report mailing list