[Erp5-report] r20311 - in /erp5/trunk/products/CMFActivity: ./ Activity/ tests/
nobody at svn.erp5.org
nobody at svn.erp5.org
Fri Apr 4 17:39:07 CEST 2008
Author: vincent
Date: Fri Apr 4 17:39:06 2008
New Revision: 20311
URL: http://svn.erp5.org?rev=20311&view=rev
Log:
Use symbolic constants instead of values 0 and 1 for Message.is_executed .
Mark messages as not executable when either their path or the method to call on it cannot be retrieved.
Make messages marked as not excutable immediately fail with VALIDATE_ERROR_STATE state.
Modified:
erp5/trunk/products/CMFActivity/Activity/RAMDict.py
erp5/trunk/products/CMFActivity/Activity/RAMQueue.py
erp5/trunk/products/CMFActivity/Activity/SQLDict.py
erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
erp5/trunk/products/CMFActivity/ActivityTool.py
erp5/trunk/products/CMFActivity/tests/testCMFActivity.py
Modified: erp5/trunk/products/CMFActivity/Activity/RAMDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/RAMDict.py?rev=20311&r1=20310&r2=20311&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/RAMDict.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/RAMDict.py Fri Apr 4 17:39:06 2008
@@ -26,7 +26,7 @@
#
##############################################################################
-from Products.CMFActivity.ActivityTool import registerActivity
+from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_EXECUTED
from Products.CMFActivity.Errors import ActivityFlushError
from Queue import Queue, VALID
@@ -83,7 +83,7 @@
for key, m in self.getDict(path).items():
if m.validate(self, activity_tool) is VALID:
activity_tool.invoke(m)
- if m.is_executed:
+ if m.is_executed == MESSAGE_EXECUTED:
del self.getDict(path)[key]
get_transaction().commit()
return 0
@@ -133,7 +133,7 @@
# First Validate
if m.validate(self, activity_tool) is VALID:
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
- if not m.is_executed: # Make sure message could be invoked
+ if m.is_executed != MESSAGE_EXECUTED: # Make sure message could be invoked
# The message no longer exists
raise ActivityFlushError, (
'Could not evaluate %s on %s' % (method_id , path))
@@ -158,7 +158,7 @@
LOG('CMFActivity RAMDict: ', 0, 'flushing object %s' % '/'.join(m.object_path))
if invoke:
activity_tool.invoke(m)
- if m.is_executed:
+ if m.is_executed == MESSAGE_EXECUTED:
method_dict[m.method_id] = 1
self.deleteMessage(activity_tool, m)
else:
Modified: erp5/trunk/products/CMFActivity/Activity/RAMQueue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/RAMQueue.py?rev=20311&r1=20310&r2=20311&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/RAMQueue.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/RAMQueue.py Fri Apr 4 17:39:06 2008
@@ -26,7 +26,7 @@
#
##############################################################################
-from Products.CMFActivity.ActivityTool import registerActivity
+from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_EXECUTED
from Queue import Queue, VALID
try:
@@ -70,7 +70,7 @@
get_transaction().commit() # Start a new transaction
return 0 # Keep on ticking
activity_tool.invoke(m)
- if m.is_executed:
+ if m.is_executed == MESSAGE_EXECUTED:
self.deleteMessage(activity_tool, m) # Trash messages which are not validated (no error handling)
get_transaction().commit() # Start a new transaction
return 0 # Keep on ticking
@@ -117,7 +117,7 @@
else:
if invoke:
activity_tool.invoke(m)
- if m.is_executed:
+ if m.is_executed == MESSAGE_EXECUTED:
activity_tool.unregisterMessage(self, m)
else:
activity_tool.unregisterMessage(self, m)
@@ -130,7 +130,7 @@
else:
if invoke:
activity_tool.invoke(m)
- if m.is_executed:
+ if m.is_executed == MESSAGE_EXECUTED:
self.deleteMessage(activity_tool, m) # Only delete if no error happens
else:
self.deleteMessage(activity_tool, m)
Modified: erp5/trunk/products/CMFActivity/Activity/SQLDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLDict.py?rev=20311&r1=20310&r2=20311&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLDict.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLDict.py Fri Apr 4 17:39:06 2008
@@ -26,7 +26,7 @@
#
##############################################################################
-from Products.CMFActivity.ActivityTool import registerActivity
+from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY, \
abortTransactionSynchronously
from RAMDict import RAMDict
@@ -344,9 +344,10 @@
make_available_uid_list = []
message_with_active_process_list = []
notify_user_list = []
- something_failed = (len([x for x in message_uid_priority_list if not x[1].is_executed]) != 0)
+ non_executable_message_list = []
+ something_failed = (len([x for x in message_uid_priority_list if x[1].is_executed == MESSAGE_NOT_EXECUTED]) != 0)
for uid, m, priority in message_uid_priority_list:
- if m.is_executed:
+ if m.is_executed == MESSAGE_EXECUTED:
if something_failed:
make_available_uid_list.append(uid)
make_available_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
@@ -357,7 +358,7 @@
# XXX: Bug here: Even if a duplicate message has an active_process,
# it won't be called on the duplicate.
message_with_active_process_list.append(m)
- else:
+ elif m.is_executed == MESSAGE_NOT_EXECUTED:
# Should duplicate messages follow strictly the original message, or
# should they be just made available again ?
make_available_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
@@ -376,6 +377,11 @@
except:
LOG('SQLDict', WARNING, 'Failed to increase priority of %r' % (uid, ), error=sys.exc_info())
delay_uid_list.append(uid)
+ else:
+ # Internal CMFActivity error: the message can not be executed because
+ # something is missing (context object cannot be found, method cannot
+ # be accessed on object).
+ non_executable_message_list.append(uid)
if len(deletable_uid_list):
try:
self._retryOnLockError(activity_tool.SQLDict_delMessage, kw={'uid': deletable_uid_list})
@@ -396,6 +402,11 @@
processing_node=INVOKE_ERROR_STATE)
except:
LOG('SQLDict', ERROR, 'Failed to set message to error state for %r' % (final_error_uid_list, ), error=sys.exc_info())
+ if len(non_executable_message_list):
+ try:
+ activity_tool.SQLDict_assignMessage(uid=non_executable_message_list, processing_node=VALIDATE_ERROR_STATE)
+ except:
+ LOG('SQLDict', ERROR, 'Failed to set message to invalid path state for %r' % (non_executable_message_list, ), error=sys.exc_info())
if len(make_available_uid_list):
try:
makeMessageListAvailable(make_available_uid_list)
@@ -472,7 +483,7 @@
else:
LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list))
# Abort if something failed.
- if len([x for x in message_uid_priority_list if not x[1].is_executed]) != 0:
+ if len([x for x in message_uid_priority_list if x[1].is_executed == MESSAGE_NOT_EXECUTED]) != 0:
endTransaction = abortTransactionSynchronously
else:
endTransaction = get_transaction().commit
@@ -489,7 +500,7 @@
LOG('SQLDict', PANIC, 'Failed to abort executed messages which also failed to commit. Some objects may be modified accidentally.')
raise
for x in message_uid_priority_list:
- x[1].is_executed = 0
+ x[1].is_executed = MESSAGE_NOT_EXECUTED
failed_message_uid_list = [x[0] for x in message_uid_priority_list]
try:
makeMessageListAvailable(failed_message_uid_list, uid_to_duplicate_uid_list_dict)
@@ -541,7 +552,7 @@
validate_value = m.validate(self, activity_tool)
if validate_value is VALID:
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
- if not m.is_executed: # Make sure message could be invoked
+ if m.is_executed != MESSAGE_EXECUTED: # Make sure message could be invoked
# The message no longer exists
raise ActivityFlushError, (
'Could not evaluate %s on %s' % (m.method_id , path))
@@ -572,7 +583,7 @@
if validate_value is VALID:
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
# LOG('SQLDict.flush m.is_executed',0,m.is_executed)
- if not m.is_executed: # Make sure message could be invoked
+ if m.is_executed != MESSAGE_EXECUTED: # Make sure message could be invoked
# The message no longer exists
raise ActivityFlushError, (
'Could not evaluate %s on %s' % (m.method_id , path))
Modified: erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLQueue.py?rev=20311&r1=20310&r2=20311&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLQueue.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLQueue.py Fri Apr 4 17:39:06 2008
@@ -26,7 +26,7 @@
#
##############################################################################
-from Products.CMFActivity.ActivityTool import registerActivity
+from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
from RAMQueue import RAMQueue
from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY, \
abortTransactionSynchronously
@@ -197,12 +197,13 @@
final_error_uid_list = []
message_with_active_process_list = []
notify_user_list = []
+ non_executable_message_list = []
for uid, m, priority in message_uid_priority_list:
- if m.is_executed:
+ if m.is_executed == MESSAGE_EXECUTED:
deletable_uid_list.append(uid)
if m.active_process:
message_with_active_process_list.append(m)
- else:
+ elif m.is_executed == MESSAGE_NOT_EXECUTED:
if type(m.exc_type) is ClassType and \
issubclass(m.exc_type, ConflictError):
delay_uid_list.append(uid)
@@ -224,6 +225,11 @@
LOG('SQLQueue', ERROR, 'Failed to unreserve %r' % (uid, ), error=sys.exc_info())
else:
LOG('SQLQueue', TRACE, 'Freed message %r' % (uid, ))
+ else:
+ # Internal CMFActivity error: the message can not be executed because
+ # something is missing (context object cannot be found, method cannot
+ # be accessed on object).
+ non_executable_message_list.append(uid)
if len(deletable_uid_list):
try:
self._retryOnLockError(activity_tool.SQLQueue_delMessage, kw={'uid': deletable_uid_list})
@@ -249,6 +255,12 @@
processing_node=INVOKE_ERROR_STATE)
except:
LOG('SQLQueue', ERROR, 'Failed to set message to error state for %r' % (final_error_uid_list, ), error=sys.exc_info())
+ if len(non_executable_message_list):
+ try:
+ activity_tool.SQLQueue_assignMessage(uid=non_executable_message_list,
+ processing_node=VALIDATE_ERROR_STATE)
+ except:
+ LOG('SQLQueue', ERROR, 'Failed to set message to invalid path state for %r' % (final_error_uid_list, ), error=sys.exc_info())
try:
for m in notify_user_list:
m.notifyUser(activity_tool)
@@ -287,7 +299,7 @@
# Try to invoke
try:
activity_tool.invoke(value[1])
- if value[1].is_executed:
+ if value[1].is_executed != MESSAGE_NOT_EXECUTED:
# Commit so that if a message raises it doesn't causes previous
# successfull messages to be rolled back. This commit might fail,
# so it is protected the same way as activity execution by the
@@ -307,7 +319,7 @@
# We must make sure that the message is not set as executed.
# It is possible that the message is executed but the commit
# of the transaction fails
- value[1].is_executed = 0
+ value[1].is_executed = MESSAGE_NOT_EXECUTED
try:
makeMessageListAvailable([value[0]])
except:
@@ -368,7 +380,7 @@
validate_value = m.validate(self, activity_tool)
if validate_value is VALID:
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
- if not m.is_executed: # Make sure message could be invoked
+ if m.is_executed != MESSAGE_EXECUTED: # Make sure message could be invoked
# The message no longer exists
raise ActivityFlushError, (
'Could not evaluate %s on %s' % (m.method_id , path))
@@ -391,7 +403,7 @@
validate_value = m.validate(self, activity_tool)
if validate_value is VALID:
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
- if not m.is_executed: # Make sure message could be invoked
+ if m.is_executed != MESSAGE_EXECUTED: # Make sure message could be invoked
# The message no longer exists
raise ActivityFlushError, (
'Could not evaluate %s on %s' % (method_id , path))
Modified: erp5/trunk/products/CMFActivity/ActivityTool.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/ActivityTool.py?rev=20311&r1=20310&r2=20311&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/ActivityTool.py (original)
+++ erp5/trunk/products/CMFActivity/ActivityTool.py Fri Apr 4 17:39:06 2008
@@ -104,6 +104,10 @@
activity_instance = activity()
activity_dict[activity.__name__] = activity_instance
+MESSAGE_NOT_EXECUTED = 0
+MESSAGE_EXECUTED = 1
+MESSAGE_NOT_EXECUTABLE = 2
+
class Message:
"""Activity Message Class.
@@ -128,7 +132,7 @@
self.method_id = method_id
self.args = args
self.kw = kw
- self.is_executed = 0
+ self.is_executed = MESSAGE_NOT_EXECUTED
self.exc_type = None
self.exc_value = None
self.traceback = None
@@ -206,30 +210,41 @@
def __call__(self, activity_tool):
try:
obj = self.getObject(activity_tool)
- old_security_manager = getSecurityManager()
- # Change user if required (TO BE DONE)
- # We will change the user only in order to execute this method
- user = self.changeUser(self.user_name, activity_tool)
+ except KeyError:
+ self.is_executed = MESSAGE_NOT_EXECUTABLE
+ else:
try:
- result = getattr(obj, self.method_id)(*self.args, **self.kw)
- finally:
- setSecurityManager(old_security_manager)
-
- self.activateResult(activity_tool, result, obj)
- self.is_executed = 1
- except:
- self.is_executed = 0
- exc_info = sys.exc_info()
- self.exc_type = exc_info[0]
- self.exc_value = str(exc_info[1])
- self.traceback = ''.join(ExceptionFormatter.format_exception(
- *exc_info))
- LOG('ActivityTool', WARNING,
- 'Could not call method %s on object %s' % (
- self.method_id, self.object_path), error=exc_info)
- # push the error in ZODB error_log
- if getattr(activity_tool, 'error_log', None) is not None:
- activity_tool.error_log.raising(exc_info)
+ old_security_manager = getSecurityManager()
+ # Change user if required (TO BE DONE)
+ # We will change the user only in order to execute this method
+ user = self.changeUser(self.user_name, activity_tool)
+ # XXX: There is no check to see if user is allowed to access
+ # that method !
+ method = getattr(obj, self.method_id, None)
+ try:
+ if method is None:
+ self.is_executed = MESSAGE_NOT_EXECUTABLE
+ else:
+ result = method(*self.args, **self.kw)
+ finally:
+ setSecurityManager(old_security_manager)
+
+ if method is not None:
+ self.activateResult(activity_tool, result, obj)
+ self.is_executed = MESSAGE_EXECUTED
+ except:
+ self.is_executed = MESSAGE_NOT_EXECUTED
+ exc_info = sys.exc_info()
+ self.exc_type = exc_info[0]
+ self.exc_value = str(exc_info[1])
+ self.traceback = ''.join(ExceptionFormatter.format_exception(
+ *exc_info))
+ LOG('ActivityTool', WARNING,
+ 'Could not call method %s on object %s' % (
+ self.method_id, self.object_path), error=exc_info)
+ # push the error in ZODB error_log
+ if getattr(activity_tool, 'error_log', None) is not None:
+ activity_tool.error_log.raising(exc_info)
def validate(self, activity, activity_tool, check_order_validation=1):
return activity.validate(activity_tool, self,
@@ -846,13 +861,17 @@
expanded_object_list = []
new_message_list = []
path_dict = {}
- # Filter the list of messages. If an object is not available, ignore such a message.
+ # Filter the list of messages. If an object is not available, mark its message as non-executable.
# In addition, expand an object if necessary, and make sure that no duplication happens.
for m in message_list:
# alternate method is used to segregate objects which cannot be grouped.
alternate_method_id = m.activity_kw.get('alternate_method_id')
try:
obj = m.getObject(self)
+ except KeyError:
+ m.is_executed = MESSAGE_NOT_EXECUTABLE
+ continue
+ try:
i = len(new_message_list) # This is an index of this message in new_message_list.
if m.hasExpandMethod():
for subobj in m.getObjectList(self):
@@ -890,7 +909,7 @@
object_list.append(obj)
new_message_list.append(m)
except:
- m.is_executed = 0
+ m.is_executed = MESSAGE_NOT_EXECUTED
exc_info = sys.exc_info()
m.exc_type = exc_info[0]
m.exc_value = str(exc_info[1])
@@ -917,7 +936,7 @@
traceback = ''.join(ExceptionFormatter.format_exception(
*exc_info))
for m in new_message_list:
- m.is_executed = 0
+ m.is_executed = MESSAGE_NOT_EXECUTED
m.exc_type = exc_type
m.exc_value = exc_value
m.traceback = traceback
@@ -937,16 +956,16 @@
object = object_list[i]
m = new_message_list[i]
if i in failed_message_dict:
- m.is_executed = 0
+ m.is_executed = MESSAGE_NOT_EXECUTED
LOG('ActivityTool', WARNING,
'the method %s partially failed on object %s' %
(m.method_id, m.object_path,))
else:
try:
m.activateResult(self, result, object)
- m.is_executed = 1
+ m.is_executed = MESSAGE_EXECUTED
except:
- m.is_executed = 0
+ m.is_executed = MESSAGE_NOT_EXECUTED
m.exc_type = sys.exc_info()[0]
LOG('ActivityTool', WARNING,
'Could not call method %s on object %s' % (
Modified: erp5/trunk/products/CMFActivity/tests/testCMFActivity.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/tests/testCMFActivity.py?rev=20311&r1=20310&r2=20311&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/tests/testCMFActivity.py (original)
+++ erp5/trunk/products/CMFActivity/tests/testCMFActivity.py Fri Apr 4 17:39:06 2008
@@ -2826,6 +2826,93 @@
finally:
delattr(Organisation, 'checkAbsoluteUrl')
+ def CheckMissingActivityContextObject(self, activity):
+ """
+ Check that a message whose context has ben deleted goes to -3
+ processing_node.
+ This must happen on first message execution, without any delay.
+ """
+ readMessageList = getattr(self.getPortalObject(), '%s_readMessageList' % (activity, ))
+ activity_tool = self.getActivityTool()
+ container = self.getPortal().organisation_module
+ organisation = container.newContent(portal_type='Organisation')
+ get_transaction().commit()
+ self.tic()
+ organisation.activate(activity=activity).getTitle()
+ get_transaction().commit()
+ self.assertEqual(len(activity_tool.getMessageList()), 1)
+ # Here, we delete the subobject using most low-level method, to avoid
+ # pending activity to be removed.
+ organisation_id = organisation.id
+ container._delOb(organisation_id)
+ del organisation # Avoid keeping a reference to a deleted object.
+ get_transaction().commit()
+ self.assertEqual(getattr(container, organisation_id, None), None)
+ self.assertEqual(len(activity_tool.getMessageList()), 1)
+ activity_tool.distribute()
+ self.assertEquals(len(readMessageList(processing_node=-3,
+ include_processing=1)), 0)
+ activity_tool.tic()
+ self.assertEquals(len(readMessageList(processing_node=-3,
+ include_processing=1)), 1)
+
+ def test_109_checkMissingActivityContextObjectSQLDict(self, quiet=0,
+ run=run_all_test):
+ if not run: return
+ if not quiet:
+ message = '\nCheck missing activity context object (SQLDict)'
+ ZopeTestCase._print(message)
+ LOG('Testing... ',0,message)
+ self.CheckMissingActivityContextObject('SQLDict')
+
+ def test_110_checkMissingActivityContextObjectSQLQueue(self, quiet=0,
+ run=run_all_test):
+ if not run: return
+ if not quiet:
+ message = '\nCheck missing activity context object (SQLQueue)'
+ ZopeTestCase._print(message)
+ LOG('Testing... ',0,message)
+ self.CheckMissingActivityContextObject('SQLQueue')
+
+ def test_111_checkMissingActivityContextObjectSQLDict(self, quiet=0,
+ run=run_all_test):
+ """
+ This is similar to tst 108, but here the object will be missing for an
+ activity with a group_method_id.
+ """
+ if not run: return
+ if not quiet:
+ message = '\nCheck missing activity context object with ' \
+ 'group_method_id (SQLDict)'
+ ZopeTestCase._print(message)
+ LOG('Testing... ',0,message)
+ readMessageList = self.getPortalObject().SQLDict_readMessageList
+ activity_tool = self.getActivityTool()
+ container = self.getPortalObject().organisation_module
+ organisation = container.newContent(portal_type='Organisation')
+ organisation_2 = container.newContent(portal_type='Organisation')
+ get_transaction().commit()
+ self.tic()
+ organisation.reindexObject()
+ organisation_2.reindexObject()
+ get_transaction().commit()
+ self.assertEqual(len(activity_tool.getMessageList()), 2)
+ # Here, we delete the subobject using most low-level method, to avoid
+ # pending activity to be removed.
+ organisation_id = organisation.id
+ container._delOb(organisation_id)
+ del organisation # Avoid keeping a reference to a deleted object.
+ get_transaction().commit()
+ self.assertEqual(getattr(container, organisation_id, None), None)
+ self.assertEqual(len(activity_tool.getMessageList()), 2)
+ activity_tool.distribute()
+ self.assertEquals(len(readMessageList(processing_node=-3,
+ include_processing=1)), 0)
+ activity_tool.tic()
+ self.assertEquals(len(readMessageList(processing_node=-3,
+ include_processing=1)), 1)
+ # The message excuted on "organisation_2" must have succeeded.
+ self.assertEqual(len(activity_tool.getMessageList()), 1)
def test_suite():
suite = unittest.TestSuite()
More information about the Erp5-report
mailing list