[Erp5-report] r8158 - /erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
nobody at svn.erp5.org
nobody at svn.erp5.org
Fri Jun 23 15:06:10 CEST 2006
Author: yo
Date: Fri Jun 23 15:06:08 2006
New Revision: 8158
URL: http://svn.erp5.org?rev=8158&view=rev
Log:
Ensure that SQLQueue never leave messages as processed.
Modified:
erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
Modified: erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLQueue.py?rev=8158&r1=8157&r2=8158&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLQueue.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLQueue.py Fri Jun 23 15:06:08 2006
@@ -95,45 +95,56 @@
# Make sure message can not be processed anylonger
activity_tool.SQLQueue_processMessage(uid=line.uid)
get_transaction().commit() # Release locks before starting a potentially long calculation
- m = self.loadMessage(line.message)
- # Make sure object exists
- validation_state = m.validate(self, activity_tool)
- if validation_state is not VALID:
- if validation_state in (EXCEPTION, INVALID_PATH):
- if line.priority > MAX_PRIORITY:
- # This is an error
- activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = VALIDATE_ERROR_STATE)
- # Assign message back to 'error' state
- #m.notifyUser(activity_tool) # Notify Error
- get_transaction().commit() # and commit
+
+ # At this point, the message is marked as processed.
+ try:
+ m = self.loadMessage(line.message)
+ # Make sure object exists
+ validation_state = m.validate(self, activity_tool)
+ if validation_state is not VALID:
+ if validation_state in (EXCEPTION, INVALID_PATH):
+ if line.priority > MAX_PRIORITY:
+ # This is an error.
+ # Assign message back to 'error' state.
+ activity_tool.SQLQueue_assignMessage(uid=line.uid,
+ processing_node = VALIDATE_ERROR_STATE)
+ get_transaction().commit() # and commit
+ else:
+ # Lower priority
+ activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1)
+ get_transaction().commit() # Release locks before starting a potentially long calculation
else:
- # Lower priority
- activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1)
+ # We do not lower priority for INVALID_ORDER errors but we do postpone execution
+ activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date,
+ priority = line.priority)
get_transaction().commit() # Release locks before starting a potentially long calculation
+ return 0
+ except:
+ # If any exception occurs, catch it and delay the operation.
+ get_transaction().abort()
+ activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date,
+ priority = line.priority)
+ get_transaction().commit()
+ return 0
+
+ # Try to invoke
+ activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ?
+ if m.is_executed: # Make sure message could be invoked
+ activity_tool.SQLQueue_delMessage(uid=line.uid) # Delete it
+ get_transaction().commit() # If successful, commit
+ else:
+ get_transaction().abort() # If not, abort transaction and start a new one
+ if line.priority > MAX_PRIORITY:
+ # This is an error
+ activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = INVOKE_ERROR_STATE)
+ # Assign message back to 'error' state
+ m.notifyUser(activity_tool) # Notify Error
+ get_transaction().commit() # and commit
else:
- # We do not lower priority for INVALID_ORDER errors but we do postpone execution
- activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date,
- priority = line.priority)
+ # Lower priority
+ activity_tool.SQLQueue_setPriority(uid=line.uid, date = next_processing_date,
+ priority = line.priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
- else:
- # Try to invoke
- activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ?
- if m.is_executed: # Make sure message could be invoked
- activity_tool.SQLQueue_delMessage(uid=line.uid) # Delete it
- get_transaction().commit() # If successful, commit
- else:
- get_transaction().abort() # If not, abort transaction and start a new one
- if line.priority > MAX_PRIORITY:
- # This is an error
- activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = INVOKE_ERROR_STATE)
- # Assign message back to 'error' state
- m.notifyUser(activity_tool) # Notify Error
- get_transaction().commit() # and commit
- else:
- # Lower priority
- activity_tool.SQLQueue_setPriority(uid=line.uid, date = next_processing_date,
- priority = line.priority + 1)
- get_transaction().commit() # Release locks before starting a potentially long calculation
return 0
get_transaction().commit() # Release locks before starting a potentially long calculation
return 1
More information about the Erp5-report
mailing list