[Erp5-report] r8157 - /erp5/trunk/products/CMFActivity/Activity/SQLDict.py
nobody at svn.erp5.org
nobody at svn.erp5.org
Fri Jun 23 14:51:55 CEST 2006
Author: yo
Date: Fri Jun 23 14:51:52 2006
New Revision: 8157
URL: http://svn.erp5.org?rev=8157&view=rev
Log: (empty)
Modified:
erp5/trunk/products/CMFActivity/Activity/SQLDict.py
Modified: erp5/trunk/products/CMFActivity/Activity/SQLDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLDict.py?rev=8157&r1=8156&r2=8157&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLDict.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLDict.py Fri Jun 23 14:51:52 2006
@@ -222,8 +222,8 @@
method_id = line.method_id
order_validation_text = line.order_validation_text
uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
- processing_node = None, to_date = now_date,
- order_validation_text = order_validation_text)
+ processing_node = None, to_date = now_date,
+ order_validation_text = order_validation_text)
uid_list = [x.uid for x in uid_list]
uid_list_list = [uid_list]
priority_list = [line.priority]
@@ -233,20 +233,21 @@
activity_tool.SQLDict_processMessage(uid = uid_list)
get_transaction().commit() # Release locks before starting a potentially long calculation
# This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state
- m = self.loadMessage(line.message, uid = line.uid)
- message_list = [m]
- # Validate message (make sure object exists, priority OK, etc.)
- if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
+
+ # At this point, messages are marked as processed. So catch any kind of exception to make sure
+ # that they are unmarked on error.
+ try:
+ m = self.loadMessage(line.message, uid = line.uid)
+ message_list = [m]
+ # Validate message (make sure object exists, priority OK, etc.)
+ if not self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
+ return 0
+
group_method_id = m.activity_kw.get('group_method_id')
if group_method_id is not None:
# Count the number of objects to prevent too many objects.
if m.hasExpandMethod():
- try:
- count = len(m.getObjectList(activity_tool))
- except:
- # Here, simply ignore an exception. The same exception should be handled later.
- LOG('SQLDict', 0, 'ignoring an exception from getObjectList', error=sys.exc_info())
- count = 0
+ count = len(m.getObjectList(activity_tool))
else:
count = 1
@@ -255,8 +256,8 @@
if count < MAX_GROUPED_OBJECTS:
# Retrieve objects which have the same group method.
result = readMessage(processing_node = processing_node, priority = priority,
- to_date = now_date, group_method_id = group_method_id,
- order_validation_text = order_validation_text)
+ to_date = now_date, group_method_id = group_method_id,
+ order_validation_text = order_validation_text)
#LOG('SQLDict dequeueMessage', 0, 'result = %d' % (len(result)))
for line in result:
path = line.path
@@ -269,78 +270,90 @@
# Set selected messages to processing
activity_tool.SQLDict_processMessage(uid = uid_list)
get_transaction().commit() # Release locks before starting a potentially long calculation
+
+ # Save this newly marked uids as soon as possible.
+ uid_list_list.append(uid_list)
+
m = self.loadMessage(line.message, uid = line.uid)
if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
if m.hasExpandMethod():
- try:
- count += len(m.getObjectList(activity_tool))
- except:
- # Here, simply ignore an exception. The same exception should be handled later.
- LOG('SQLDict', 0, 'ignoring an exception from getObjectList', error=sys.exc_info())
- pass
+ count += len(m.getObjectList(activity_tool))
else:
count += 1
message_list.append(m)
- uid_list_list.append(uid_list)
priority_list.append(line.priority)
if count >= MAX_GROUPED_OBJECTS:
break
-
- # Release locks before starting a potentially long calculation
+ else:
+ # If the uids were not valid, remove them from the list, as validateMessage
+ # unmarked them.
+ uid_list_list.pop()
+
+ # Release locks before starting a potentially long calculation
+ get_transaction().commit()
+ except:
+ # If an exception occurs, abort the transaction to minimize the impact,
+ # then simply delay the operations.
+ get_transaction().abort()
+ for uid_list in uid_list_list:
+ activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
+ retry = 1)
get_transaction().commit()
- # Try to invoke
- if group_method_id is not None:
- LOG('SQLDict', TRACE,
- 'invoking a group method %s with %d objects '\
- ' (%d objects in expanded form)' % (
- group_method_id, len(message_list), count))
- activity_tool.invokeGroup(group_method_id, message_list)
+ return 0
+
+ # Try to invoke
+ if group_method_id is not None:
+ LOG('SQLDict', TRACE,
+ 'invoking a group method %s with %d objects '\
+ ' (%d objects in expanded form)' % (
+ group_method_id, len(message_list), count))
+ activity_tool.invokeGroup(group_method_id, message_list)
+ else:
+ activity_tool.invoke(message_list[0])
+
+ # Check if messages are executed successfully.
+ # When some of them are executed successfully, it may not be acceptable to
+ # abort the transaction, because these remain pending, only due to other
+ # invalid messages. This means that a group method should not be used if
+ # it has a side effect. For now, only indexing uses a group method, and this
+ # has no side effect.
+ for m in message_list:
+ if m.is_executed:
+ break
+ else:
+ get_transaction().abort()
+
+ for i in xrange(len(message_list)):
+ m = message_list[i]
+ uid_list = uid_list_list[i]
+ priority = priority_list[i]
+ if m.is_executed:
+ activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it
+ get_transaction().commit() # If successful, commit
+ if m.active_process:
+ active_process = activity_tool.unrestrictedTraverse(m.active_process)
+ if not active_process.hasActivity():
+ # No more activity
+ m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
else:
- activity_tool.invoke(message_list[0])
-
- # Check if messages are executed successfully.
- # When some of them are executed successfully, it may not be acceptable to
- # abort the transaction, because these remain pending, only due to other
- # invalid messages. This means that a group method should not be used if
- # it has a side effect. For now, only indexing uses a group method, and this
- # has no side effect.
- for m in message_list:
- if m.is_executed:
- break
- else:
- get_transaction().abort()
-
- for i in xrange(len(message_list)):
- m = message_list[i]
- uid_list = uid_list_list[i]
- priority = priority_list[i]
- if m.is_executed:
- activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it
- get_transaction().commit() # If successful, commit
- if m.active_process:
- active_process = activity_tool.unrestrictedTraverse(m.active_process)
- if not active_process.hasActivity():
- # No more activity
- m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
+ if type(m.exc_type) is ClassType and issubclass(m.exc_type, ConflictError):
+ # If this is a conflict error, do not lower the priority but only delay.
+ activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
+ retry = 1)
+ get_transaction().commit() # Release locks before starting a potentially long calculation
+ elif priority > MAX_PRIORITY:
+ # This is an error
+ if len(uid_list) > 0:
+ activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE)
+ # Assign message back to 'error' state
+ m.notifyUser(activity_tool) # Notify Error
+ get_transaction().commit() # and commit
else:
- if type(m.exc_type) is ClassType and issubclass(m.exc_type, ConflictError):
- # If this is a conflict error, do not lower the priority but only delay.
+ # Lower priority
+ if len(uid_list) > 0:
activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
- retry = 1)
+ priority = priority + 1, retry = 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
- elif priority > MAX_PRIORITY:
- # This is an error
- if len(uid_list) > 0:
- activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE)
- # Assign message back to 'error' state
- m.notifyUser(activity_tool) # Notify Error
- get_transaction().commit() # and commit
- else:
- # Lower priority
- if len(uid_list) > 0:
- activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
- priority = priority + 1, retry = 1)
- get_transaction().commit() # Release locks before starting a potentially long calculation
return 0
get_transaction().commit() # Release locks before starting a potentially long calculation
More information about the Erp5-report
mailing list