[Erp5-report] r8157 - /erp5/trunk/products/CMFActivity/Activity/SQLDict.py

nobody at svn.erp5.org nobody at svn.erp5.org
Fri Jun 23 14:51:55 CEST 2006


Author: yo
Date: Fri Jun 23 14:51:52 2006
New Revision: 8157

URL: http://svn.erp5.org?rev=8157&view=rev
Log: (empty)

Modified:
    erp5/trunk/products/CMFActivity/Activity/SQLDict.py

Modified: erp5/trunk/products/CMFActivity/Activity/SQLDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLDict.py?rev=8157&r1=8156&r2=8157&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLDict.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLDict.py Fri Jun 23 14:51:52 2006
@@ -222,8 +222,8 @@
       method_id = line.method_id
       order_validation_text = line.order_validation_text
       uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
-                                                    processing_node = None, to_date = now_date,
-                                                    order_validation_text = order_validation_text)
+                                                   processing_node = None, to_date = now_date,
+                                                   order_validation_text = order_validation_text)
       uid_list = [x.uid for x in uid_list]
       uid_list_list = [uid_list]
       priority_list = [line.priority]
@@ -233,20 +233,21 @@
         activity_tool.SQLDict_processMessage(uid = uid_list)
       get_transaction().commit() # Release locks before starting a potentially long calculation
       # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state
-      m = self.loadMessage(line.message, uid = line.uid)
-      message_list = [m]
-      # Validate message (make sure object exists, priority OK, etc.)
-      if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
+
+      # At this point, messages are marked as processed. So catch any kind of exception to make sure
+      # that they are unmarked on error.
+      try:
+        m = self.loadMessage(line.message, uid = line.uid)
+        message_list = [m]
+        # Validate message (make sure object exists, priority OK, etc.)
+        if not self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
+          return 0
+
         group_method_id = m.activity_kw.get('group_method_id')
         if group_method_id is not None:
           # Count the number of objects to prevent too many objects.
           if m.hasExpandMethod():
-            try:
-              count = len(m.getObjectList(activity_tool))
-            except:
-              # Here, simply ignore an exception. The same exception should be handled later.
-              LOG('SQLDict', 0, 'ignoring an exception from getObjectList', error=sys.exc_info())
-              count = 0
+            count = len(m.getObjectList(activity_tool))
           else:
             count = 1
 
@@ -255,8 +256,8 @@
           if count < MAX_GROUPED_OBJECTS:
             # Retrieve objects which have the same group method.
             result = readMessage(processing_node = processing_node, priority = priority,
-                                 to_date = now_date, group_method_id = group_method_id,
-                                 order_validation_text = order_validation_text)
+                                to_date = now_date, group_method_id = group_method_id,
+                                order_validation_text = order_validation_text)
             #LOG('SQLDict dequeueMessage', 0, 'result = %d' % (len(result)))
             for line in result:
               path = line.path
@@ -269,78 +270,90 @@
                 # Set selected messages to processing
                 activity_tool.SQLDict_processMessage(uid = uid_list)
               get_transaction().commit() # Release locks before starting a potentially long calculation
+
+              # Save this newly marked uids as soon as possible.
+              uid_list_list.append(uid_list)
+
               m = self.loadMessage(line.message, uid = line.uid)
               if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
                 if m.hasExpandMethod():
-                  try:
-                    count += len(m.getObjectList(activity_tool))
-                  except:
-                    # Here, simply ignore an exception. The same exception should be handled later.
-                    LOG('SQLDict', 0, 'ignoring an exception from getObjectList', error=sys.exc_info())
-                    pass
+                  count += len(m.getObjectList(activity_tool))
                 else:
                   count += 1
                 message_list.append(m)
-                uid_list_list.append(uid_list)
                 priority_list.append(line.priority)
                 if count >= MAX_GROUPED_OBJECTS:
                   break
-
-        # Release locks before starting a potentially long calculation
+              else:
+                # If the uids were not valid, remove them from the list, as validateMessage
+                # unmarked them.
+                uid_list_list.pop()
+
+          # Release locks before starting a potentially long calculation
+          get_transaction().commit()
+      except:
+        # If an exception occurs, abort the transaction to minimize the impact,
+        # then simply delay the operations.
+        get_transaction().abort()
+        for uid_list in uid_list_list:
+          activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
+                                            retry = 1)
         get_transaction().commit()
-        # Try to invoke
-        if group_method_id is not None:
-          LOG('SQLDict', TRACE,
-              'invoking a group method %s with %d objects '\
-              ' (%d objects in expanded form)' % (
-              group_method_id, len(message_list), count))
-          activity_tool.invokeGroup(group_method_id, message_list)
+        return 0
+
+      # Try to invoke
+      if group_method_id is not None:
+        LOG('SQLDict', TRACE,
+            'invoking a group method %s with %d objects '\
+            ' (%d objects in expanded form)' % (
+            group_method_id, len(message_list), count))
+        activity_tool.invokeGroup(group_method_id, message_list)
+      else:
+        activity_tool.invoke(message_list[0])
+
+      # Check if messages are executed successfully.
+      # When some of them are executed successfully, it may not be acceptable to
+      # abort the transaction, because these remain pending, only due to other
+      # invalid messages. This means that a group method should not be used if
+      # it has a side effect. For now, only indexing uses a group method, and this
+      # has no side effect.
+      for m in message_list:
+        if m.is_executed:
+          break
+      else:
+        get_transaction().abort()
+
+      for i in xrange(len(message_list)):
+        m = message_list[i]
+        uid_list = uid_list_list[i]
+        priority = priority_list[i]
+        if m.is_executed:
+          activity_tool.SQLDict_delMessage(uid = uid_list)                # Delete it
+          get_transaction().commit()                                        # If successful, commit
+          if m.active_process:
+            active_process = activity_tool.unrestrictedTraverse(m.active_process)
+            if not active_process.hasActivity():
+              # No more activity
+              m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
         else:
-          activity_tool.invoke(message_list[0])
-
-        # Check if messages are executed successfully.
-        # When some of them are executed successfully, it may not be acceptable to
-        # abort the transaction, because these remain pending, only due to other
-        # invalid messages. This means that a group method should not be used if
-        # it has a side effect. For now, only indexing uses a group method, and this
-        # has no side effect.
-        for m in message_list:
-          if m.is_executed:
-            break
-        else:
-          get_transaction().abort()
-
-        for i in xrange(len(message_list)):
-          m = message_list[i]
-          uid_list = uid_list_list[i]
-          priority = priority_list[i]
-          if m.is_executed:
-            activity_tool.SQLDict_delMessage(uid = uid_list)                # Delete it
-            get_transaction().commit()                                        # If successful, commit
-            if m.active_process:
-              active_process = activity_tool.unrestrictedTraverse(m.active_process)
-              if not active_process.hasActivity():
-                # No more activity
-                m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
+          if type(m.exc_type) is ClassType and issubclass(m.exc_type, ConflictError):
+            # If this is a conflict error, do not lower the priority but only delay.
+            activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
+                                              retry = 1)
+            get_transaction().commit() # Release locks before starting a potentially long calculation
+          elif priority > MAX_PRIORITY:
+            # This is an error
+            if len(uid_list) > 0:
+              activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE)
+                                                                              # Assign message back to 'error' state
+            m.notifyUser(activity_tool)                                       # Notify Error
+            get_transaction().commit()                                        # and commit
           else:
-            if type(m.exc_type) is ClassType and issubclass(m.exc_type, ConflictError):
-              # If this is a conflict error, do not lower the priority but only delay.
+            # Lower priority
+            if len(uid_list) > 0:
               activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
-                                                retry = 1)
+                                                priority = priority + 1, retry = 1)
               get_transaction().commit() # Release locks before starting a potentially long calculation
-            elif priority > MAX_PRIORITY:
-              # This is an error
-              if len(uid_list) > 0:
-                activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE)
-                                                                                # Assign message back to 'error' state
-              m.notifyUser(activity_tool)                                       # Notify Error
-              get_transaction().commit()                                        # and commit
-            else:
-              # Lower priority
-              if len(uid_list) > 0:
-                activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
-                                                  priority = priority + 1, retry = 1)
-                get_transaction().commit() # Release locks before starting a potentially long calculation
 
       return 0
     get_transaction().commit() # Release locks before starting a potentially long calculation




More information about the Erp5-report mailing list