[Erp5-report] r32879 jm - in /erp5/trunk/products/CMFActivity: ./ Activity/ tests/
nobody at svn.erp5.org
nobody at svn.erp5.org
Fri Feb 19 18:56:15 CET 2010
Author: jm
Date: Fri Feb 19 18:56:15 2010
New Revision: 32879
URL: http://svn.erp5.org?rev=32879&view=rev
Log:
Allow executed activity to decide how to finalize message execution
getActivityRuntimeEnvironment is changed to return an ActivityRuntimeEnvironment
instance instead of a dict (and this value is now stored in a transactional
variable, for automatic cleanup).
This object allow activities to change default behaviour of CMFActivity if an
error happens. In the future, this object could also allow executed activity to
inspect its related Message object.
In case of infinite retry, notify the user when the default limit is reached.
Modified:
erp5/trunk/products/CMFActivity/Activity/SQLBase.py
erp5/trunk/products/CMFActivity/Activity/SQLDict.py
erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
erp5/trunk/products/CMFActivity/ActivityRuntimeEnvironment.py
erp5/trunk/products/CMFActivity/ActivityTool.py
erp5/trunk/products/CMFActivity/tests/testCMFActivity.py
Modified: erp5/trunk/products/CMFActivity/Activity/SQLBase.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLBase.py?rev=32879&r1=32878&r2=32879&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLBase.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLBase.py [utf8] Fri Feb 19 18:56:15 2010
@@ -34,8 +34,6 @@
from Products.CMFActivity.ActiveObject import (
INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE)
from Queue import VALIDATION_ERROR_DELAY
-
-MAX_RETRY = 5
class SQLBase:
@@ -166,16 +164,24 @@
# please, remove the "type(m.exc_type) is type(ConflictError)" check
# and leave only the "issubclass(m.exc_type, ConflictError)" check.
if type(m.exc_type) is type(ConflictError) and \
- issubclass(m.exc_type, ConflictError):
+ m.conflict_retry and issubclass(m.exc_type, ConflictError):
delay_uid_list.append(uid)
else:
+ max_retry = m.max_retry
retry = m.line.retry
- if retry >= MAX_RETRY:
+ if max_retry is not None and retry >= max_retry:
+ # Always notify when we stop retrying.
notify_user_list.append(m)
final_error_uid_list.append(uid)
continue
- # By default, make delay quadratic to the number of retries.
- delay = VALIDATION_ERROR_DELAY * (retry * retry + 1) / 2
+ # In case of infinite retry, notify the user
+ # when the default limit is reached.
+ if max_retry is None and retry == m.__class__.max_retry:
+ notify_user_list.append(m)
+ delay = m.delay
+ if delay is None:
+ # By default, make delay quadratic to the number of retries.
+ delay = VALIDATION_ERROR_DELAY * (retry * retry + 1) / 2
try:
# Immediately update, because values different for every message
activity_tool.SQLBase_reactivate(table=self.sql_table,
Modified: erp5/trunk/products/CMFActivity/Activity/SQLDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLDict.py?rev=32879&r1=32878&r2=32879&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLDict.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLDict.py [utf8] Fri Feb 19 18:56:15 2010
@@ -36,7 +36,8 @@
from types import ClassType
#from time import time
from SQLBase import SQLBase
-from Products.CMFActivity.ActivityRuntimeEnvironment import setActivityRuntimeValue, updateActivityRuntimeValue, clearActivityRuntimeEnvironment
+from Products.CMFActivity.ActivityRuntimeEnvironment import (
+ ActivityRuntimeEnvironment, getTransactionalVariable)
from zExceptions import ExceptionFormatter
try:
@@ -315,19 +316,15 @@
# Remove group_id parameter from group_method_id
if group_method_id is not None:
group_method_id = group_method_id.split('\0')[0]
- clearActivityRuntimeEnvironment()
if group_method_id not in (None, ""):
- setActivityRuntimeValue('group_method_id', group_method_id)
method = activity_tool.invokeGroup
args = (group_method_id, message_list)
+ activity_runtime_environment = ActivityRuntimeEnvironment(None)
else:
method = activity_tool.invoke
message = message_list[0]
args = (message, )
- updateActivityRuntimeValue({'activity_kw': message.activity_kw,
- 'priority': message.line.priority,
- 'uid': message.uid})
- setActivityRuntimeValue('processing_node', processing_node)
+ activity_runtime_environment = ActivityRuntimeEnvironment(message)
# Commit right before executing messages.
# As MySQL transaction does not start exactly at the same time as ZODB
# transactions but a bit later, messages available might be called
@@ -336,6 +333,8 @@
# So all connectors must be committed now that we have selected
# everything needed from MySQL to get a fresh view of ZODB objects.
get_transaction().commit()
+ tv = getTransactionalVariable(None)
+ tv['activity_runtime_environment'] = activity_runtime_environment
# Try to invoke
try:
method(*args)
Modified: erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLQueue.py?rev=32879&r1=32878&r2=32879&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLQueue.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLQueue.py [utf8] Fri Feb 19 18:56:15 2010
@@ -37,7 +37,8 @@
from time import time
from sets import ImmutableSet
from SQLBase import SQLBase
-from Products.CMFActivity.ActivityRuntimeEnvironment import setActivityRuntimeValue, updateActivityRuntimeValue, clearActivityRuntimeEnvironment
+from Products.CMFActivity.ActivityRuntimeEnvironment import (
+ ActivityRuntimeEnvironment, getTransactionalVariable)
from zExceptions import ExceptionFormatter
try:
@@ -214,13 +215,10 @@
# So all connectors must be committed now that we have selected
# everything needed from MySQL to get a fresh view of ZODB objects.
get_transaction().commit()
+ tv = getTransactionalVariable(None)
for m in message_list:
+ tv['activity_runtime_environment'] = ActivityRuntimeEnvironment(m)
processed_count += 1
- clearActivityRuntimeEnvironment()
- updateActivityRuntimeValue({'processing_node': processing_node,
- 'activity_kw': m.activity_kw,
- 'priority': m.line.priority,
- 'uid': m.uid})
# Try to invoke
try:
activity_tool.invoke(m)
Modified: erp5/trunk/products/CMFActivity/ActivityRuntimeEnvironment.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/ActivityRuntimeEnvironment.py?rev=32879&r1=32878&r2=32879&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/ActivityRuntimeEnvironment.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/ActivityRuntimeEnvironment.py [utf8] Fri Feb 19 18:56:15 2010
@@ -1,33 +1,35 @@
-import threading
-import copy
-
-activity_runtime_environment_container = threading.local()
+from Products.ERP5Type.TransactionalVariable import getTransactionalVariable
def getActivityRuntimeEnvironment():
"""
- Raises AttributeError if called outside activity.
+ Raises KeyError if called outside activity.
"""
- return copy.deepcopy(activity_runtime_environment_container.current)
+ return getTransactionalVariable(None)['activity_runtime_environment']
def _getActivityRuntimeEnvironment():
- current = getattr(activity_runtime_environment_container, 'current', None)
- if current is None:
- current = activity_runtime_environment_container.current = {}
- return current
+ try:
+ return getActivityRuntimeEnvironment()
+ except KeyError:
+ return
-def setActivityRuntimeValue(key, value):
- """
- TODO: protect against unauthorized use ?
- """
- _getActivityRuntimeEnvironment()[key] = value
-def updateActivityRuntimeValue(new_dict):
- """
- TODO: protect against unauthorized use ?
- """
- _getActivityRuntimeEnvironment().update(new_dict)
+class BaseMessage:
-def clearActivityRuntimeEnvironment():
- if getattr(activity_runtime_environment_container, 'current', None) is not None:
- delattr(activity_runtime_environment_container, 'current')
+ delay = None
+ # None means infinite retry
+ max_retry = 5
+ # For errors happening after message invocation (ConflictError),
+ # should we retry quickly without increasing 'retry' count ?
+ conflict_retry = True
+
+class ActivityRuntimeEnvironment(object):
+
+ def __init__(self, message):
+ self._message = message
+
+ def edit(self, **kw):
+ # There is no point allowing to modify other attributes from a message
+ for k in kw:
+ getattr(BaseMessage, k)
+ self._message.__dict__.update(kw)
Modified: erp5/trunk/products/CMFActivity/ActivityTool.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/ActivityTool.py?rev=32879&r1=32878&r2=32879&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/ActivityTool.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/ActivityTool.py [utf8] Fri Feb 19 18:56:15 2010
@@ -48,6 +48,7 @@
from Acquisition import aq_base
from Acquisition import aq_inner
from ActivityBuffer import ActivityBuffer
+from ActivityRuntimeEnvironment import BaseMessage
from zExceptions import ExceptionFormatter
from BTrees.OIBTree import OIBTree
@@ -153,7 +154,8 @@
MESSAGE_EXECUTED = 1
MESSAGE_NOT_EXECUTABLE = 2
-class Message:
+
+class Message(BaseMessage):
"""Activity Message Class.
Message instances are stored in an activity queue, inside the Activity Tool.
Modified: erp5/trunk/products/CMFActivity/tests/testCMFActivity.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/tests/testCMFActivity.py?rev=32879&r1=32878&r2=32879&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/tests/testCMFActivity.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/tests/testCMFActivity.py [utf8] Fri Feb 19 18:56:15 2010
@@ -32,6 +32,7 @@
from Testing import ZopeTestCase
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.utils import DummyMailHost
+from Products.ERP5Type.TransactionalVariable import getTransactionalVariable
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE,\
VALIDATE_ERROR_STATE
from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY
@@ -49,7 +50,6 @@
import cPickle as pickle
from Products.CMFActivity.ActivityTool import Message
import random
-from Products.CMFActivity.ActivityRuntimeEnvironment import setActivityRuntimeValue, clearActivityRuntimeEnvironment
import threading
try:
@@ -2802,44 +2802,32 @@
delattr(Organisation, 'mustRunAfter')
def CheckActivityRuntimeEnvironment(self, activity):
- organisation = self.getPortal().organisation_module.newContent(portal_type='Organisation')
- get_transaction().commit()
- self.tic()
- activity_tool = self.getActivityTool()
- check_result_dict = {}
- initial_list_check_value = [1, 2]
+ document = self.portal.organisation_module
+ activity_result = []
def extractActivityRuntimeEnvironment(self):
- setActivityRuntimeValue('list_check', initial_list_check_value)
- environment = self.getActivityRuntimeEnvironment()
- check_result_dict['environment'] = environment
- def runAndCheck():
- check_result_dict.clear()
- self.assertFalse('environment' in check_result_dict)
- get_transaction().commit()
+ activity_result.append(self.getActivityRuntimeEnvironment())
+ document.__class__.doSomething = extractActivityRuntimeEnvironment
+ try:
+ document.activate(activity=activity).doSomething()
+ get_transaction().commit()
+ # Check that getActivityRuntimeEnvironment raises outside of activities
+ self.assertRaises(KeyError, document.getActivityRuntimeEnvironment)
+ # Check Runtime isolation
self.tic()
- self.assertTrue('environment' in check_result_dict)
- Organisation.extractActivityRuntimeEnvironment = extractActivityRuntimeEnvironment
- try:
- # Check that organisation.getActivityRuntimeEnvironment raises outside
- # of activities.
- clearActivityRuntimeEnvironment()
- #organisation.getActivityRuntimeEnvironment()
- self.assertRaises(AttributeError, organisation.getActivityRuntimeEnvironment)
- # Check Runtime isolation
- setActivityRuntimeValue('blah', True)
- organisation.activate(activity=activity).extractActivityRuntimeEnvironment()
- runAndCheck()
- self.assertEqual(check_result_dict['environment'].get('blah'), None)
- # Check Runtime presence
- self.assertTrue(len(check_result_dict['environment']) > 0)
- self.assertTrue('processing_node' in check_result_dict['environment'])
- # Check Runtime does a deepcopy
- self.assertTrue('list_check' in check_result_dict['environment'])
- check_result_dict['environment']['list_check'].append(3)
- self.assertTrue(check_result_dict['environment']['list_check'] != \
- initial_list_check_value)
+ # Check that it still raises outside of activities
+ self.assertRaises(KeyError, document.getActivityRuntimeEnvironment)
+ # Check activity runtime environment instance
+ env = activity_result.pop()
+ self.assertFalse(activity_result)
+ message = env._message
+ self.assertEqual(message.line.priority, 1)
+ self.assertEqual(message.object_path, document.getPhysicalPath())
+ self.assertTrue(message.conflict_retry) # default value
+ env.edit(max_retry=0, conflict_retry=False)
+ self.assertFalse(message.conflict_retry) # edited value
+ self.assertRaises(AttributeError, env.edit, foo='bar')
finally:
- delattr(Organisation, 'extractActivityRuntimeEnvironment')
+ del document.__class__.doSomething
def test_104_activityRuntimeEnvironmentSQLDict(self, quiet=0, run=run_all_test):
if not run: return
More information about the Erp5-report
mailing list