[Erp5-report] r8158 - /erp5/trunk/products/CMFActivity/Activity/SQLQueue.py

nobody at svn.erp5.org nobody at svn.erp5.org
Fri Jun 23 15:06:10 CEST 2006


Author: yo
Date: Fri Jun 23 15:06:08 2006
New Revision: 8158

URL: http://svn.erp5.org?rev=8158&view=rev
Log:
Ensure that SQLQueue never leave messages as processed.

Modified:
    erp5/trunk/products/CMFActivity/Activity/SQLQueue.py

Modified: erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLQueue.py?rev=8158&r1=8157&r2=8158&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLQueue.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLQueue.py Fri Jun 23 15:06:08 2006
@@ -95,45 +95,56 @@
       # Make sure message can not be processed anylonger
       activity_tool.SQLQueue_processMessage(uid=line.uid)
       get_transaction().commit() # Release locks before starting a potentially long calculation
-      m = self.loadMessage(line.message)
-      # Make sure object exists
-      validation_state = m.validate(self, activity_tool)
-      if validation_state is not VALID:
-        if validation_state in (EXCEPTION, INVALID_PATH):
-          if line.priority > MAX_PRIORITY:
-            # This is an error
-            activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = VALIDATE_ERROR_STATE)
-                                                                              # Assign message back to 'error' state
-            #m.notifyUser(activity_tool)                                       # Notify Error
-            get_transaction().commit()                                        # and commit
+
+      # At this point, the message is marked as processed.
+      try:
+        m = self.loadMessage(line.message)
+        # Make sure object exists
+        validation_state = m.validate(self, activity_tool)
+        if validation_state is not VALID:
+          if validation_state in (EXCEPTION, INVALID_PATH):
+            if line.priority > MAX_PRIORITY:
+              # This is an error.
+              # Assign message back to 'error' state.
+              activity_tool.SQLQueue_assignMessage(uid=line.uid,
+                                                   processing_node = VALIDATE_ERROR_STATE)
+              get_transaction().commit()                                        # and commit
+            else:
+              # Lower priority
+              activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1)
+              get_transaction().commit() # Release locks before starting a potentially long calculation
           else:
-            # Lower priority
-            activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1)
+            # We do not lower priority for INVALID_ORDER errors but we do postpone execution
+            activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date,
+                                                priority = line.priority)
             get_transaction().commit() # Release locks before starting a potentially long calculation
+          return 0
+      except:
+        # If any exception occurs, catch it and delay the operation.
+        get_transaction().abort()
+        activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date,
+                                           priority = line.priority)
+        get_transaction().commit()
+        return 0
+
+      # Try to invoke
+      activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ?
+      if m.is_executed:                                          # Make sure message could be invoked
+        activity_tool.SQLQueue_delMessage(uid=line.uid)  # Delete it
+        get_transaction().commit()                                        # If successful, commit
+      else:
+        get_transaction().abort()                                         # If not, abort transaction and start a new one
+        if line.priority > MAX_PRIORITY:
+          # This is an error
+          activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = INVOKE_ERROR_STATE)
+                                                                            # Assign message back to 'error' state
+          m.notifyUser(activity_tool)                                       # Notify Error
+          get_transaction().commit()                                        # and commit
         else:
-          # We do not lower priority for INVALID_ORDER errors but we do postpone execution
-          activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date,
-                                              priority = line.priority)
+          # Lower priority
+          activity_tool.SQLQueue_setPriority(uid=line.uid, date = next_processing_date,
+                                              priority = line.priority + 1)
           get_transaction().commit() # Release locks before starting a potentially long calculation
-      else:
-        # Try to invoke
-        activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ?
-        if m.is_executed:                                          # Make sure message could be invoked
-          activity_tool.SQLQueue_delMessage(uid=line.uid)  # Delete it
-          get_transaction().commit()                                        # If successful, commit
-        else:
-          get_transaction().abort()                                         # If not, abort transaction and start a new one
-          if line.priority > MAX_PRIORITY:
-            # This is an error
-            activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = INVOKE_ERROR_STATE)
-                                                                              # Assign message back to 'error' state
-            m.notifyUser(activity_tool)                                       # Notify Error
-            get_transaction().commit()                                        # and commit
-          else:
-            # Lower priority
-            activity_tool.SQLQueue_setPriority(uid=line.uid, date = next_processing_date,
-                                                priority = line.priority + 1)
-            get_transaction().commit() # Release locks before starting a potentially long calculation
       return 0
     get_transaction().commit() # Release locks before starting a potentially long calculation
     return 1




More information about the Erp5-report mailing list