[Erp5-report] r19389 - in /erp5/trunk/products/CMFActivity: ./ Activity/ skins/activity/ te...
nobody at svn.erp5.org
nobody at svn.erp5.org
Tue Feb 19 13:59:08 CET 2008
Author: vincent
Date: Tue Feb 19 13:59:07 2008
New Revision: 19389
URL: http://svn.erp5.org?rev=19389&view=rev
Log:
Implement inter-queue priorities.
Added:
erp5/trunk/products/CMFActivity/skins/activity/SQLDict_getPriority.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_getPriority.zsql
Modified:
erp5/trunk/products/CMFActivity/Activity/Queue.py
erp5/trunk/products/CMFActivity/Activity/SQLBase.py
erp5/trunk/products/CMFActivity/Activity/SQLDict.py
erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
erp5/trunk/products/CMFActivity/ActivityTool.py
erp5/trunk/products/CMFActivity/tests/testCMFActivity.py
Modified: erp5/trunk/products/CMFActivity/Activity/Queue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/Queue.py?rev=19389&r1=19388&r2=19389&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/Queue.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/Queue.py Tue Feb 19 13:59:07 2008
@@ -348,3 +348,13 @@
delay is provided in fractions of day
"""
pass
+
+ def getPriority(self, activity_tool):
+ """
+ Get priority from this queue.
+ Lower number means higher priority value.
+ Legal value range is [1, 6].
+ Values out of this range might work, but are non-standard.
+ """
+ return 6
+
Modified: erp5/trunk/products/CMFActivity/Activity/SQLBase.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLBase.py?rev=19389&r1=19388&r2=19389&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLBase.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLBase.py Tue Feb 19 13:59:07 2008
@@ -41,3 +41,12 @@
assert len(result) == 1
assert len(result[0]) == 1
return result[0][0]
+
+ def _getPriority(self, activity_tool, method, default):
+ now_date = self.getNow(activity_tool)
+ result = method(to_date=now_date)
+ assert len(result) == 1
+ priority = result[0]['priority']
+ if priority is None:
+ priority = default
+ return priority
Modified: erp5/trunk/products/CMFActivity/Activity/SQLDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLDict.py?rev=19389&r1=19388&r2=19389&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLDict.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLDict.py Tue Feb 19 13:59:07 2008
@@ -720,4 +720,9 @@
"""
activity_tool.SQLDict_timeShift(delay=delay, processing_node=processing_node,retry=retry)
+ def getPriority(self, activity_tool):
+ method = activity_tool.SQLDict_getPriority
+ default = RAMDict.getPriority(self, activity_tool)
+ return self._getPriority(activity_tool, method, default)
+
registerActivity(SQLDict)
Modified: erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLQueue.py?rev=19389&r1=19388&r2=19389&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLQueue.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLQueue.py Tue Feb 19 13:59:07 2008
@@ -542,4 +542,9 @@
"""
activity_tool.SQLQueue_timeShift(delay=delay, processing_node=processing_node)
+ def getPriority(self, activity_tool):
+ method = activity_tool.SQLQueue_getPriority
+ default = RAMQueue.getPriority(self, activity_tool)
+ return self._getPriority(activity_tool, method, default)
+
registerActivity(SQLQueue)
Modified: erp5/trunk/products/CMFActivity/ActivityTool.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/ActivityTool.py?rev=19389&r1=19388&r2=19389&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/ActivityTool.py (original)
+++ erp5/trunk/products/CMFActivity/ActivityTool.py Tue Feb 19 13:59:07 2008
@@ -661,15 +661,22 @@
first_run = False
try:
+ #Sort activity list by priority
+ activity_list = activity_dict.values()
+ # Sort method must be local to access "self"
+ def cmpActivities(activity_1, activity_2):
+ return cmp(activity_1.getPriority(self), activity_2.getPriority(self))
+ activity_list.sort(cmpActivities)
+
# Wakeup each queue
- for activity in activity_dict.itervalues():
+ for activity in activity_list:
activity.wakeup(inner_self, processing_node)
# Process messages on each queue in round robin
has_awake_activity = 1
while has_awake_activity:
has_awake_activity = 0
- for activity in activity_dict.itervalues():
+ for activity in activity_list:
activity.tic(inner_self, processing_node) # Transaction processing is the responsability of the activity
has_awake_activity = has_awake_activity or activity.isAwake(inner_self, processing_node)
finally:
Added: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_getPriority.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_getPriority.zsql?rev=19389&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_getPriority.zsql (added)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_getPriority.zsql Tue Feb 19 13:59:07 2008
@@ -1,0 +1,18 @@
+<dtml-comment>
+title:
+connection_id:cmf_activity_sql_connection
+max_rows:0
+max_cache:0
+cache_time:0
+class_name:
+class_file:
+</dtml-comment>
+<params>
+to_date
+</params>
+SELECT MIN(`priority`) AS `priority` FROM
+ message
+WHERE
+ processing_node = 0
+ AND date <= <dtml-sqlvar to_date type="datetime">
+
Added: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_getPriority.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_getPriority.zsql?rev=19389&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_getPriority.zsql (added)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_getPriority.zsql Tue Feb 19 13:59:07 2008
@@ -1,0 +1,18 @@
+<dtml-comment>
+title:
+connection_id:cmf_activity_sql_connection
+max_rows:0
+max_cache:0
+cache_time:0
+class_name:
+class_file:
+</dtml-comment>
+<params>
+to_date
+</params>
+SELECT MIN(`priority`) AS `priority` FROM
+ message_queue
+WHERE
+ processing_node = 0
+ AND date <= <dtml-sqlvar to_date type="datetime">
+
Modified: erp5/trunk/products/CMFActivity/tests/testCMFActivity.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/tests/testCMFActivity.py?rev=19389&r1=19388&r2=19389&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/tests/testCMFActivity.py (original)
+++ erp5/trunk/products/CMFActivity/tests/testCMFActivity.py Tue Feb 19 13:59:07 2008
@@ -2625,6 +2625,58 @@
finally:
delattr(Organisation, 'checkActivityCount')
+ def test_103_interQueuePriorities(self, quiet=0, run=run_all_test):
+ """
+ Important note: there is no way to really reliably check that this
+ feature is correctly implemented, as activity execution order is
+ non-deterministic.
+ The best which can be done is to check that under certain circumstances
+ the activity exeicution order match expectations.
+ """
+ if not run: return
+ if not quiet:
+ message = '\nCheck inter-queue priorities'
+ ZopeTestCase._print(message)
+ LOG('Testing... ',0,message)
+ organisation = self.getPortal().organisation_module.newContent(portal_type='Organisation')
+ get_transaction().commit()
+ self.tic()
+ activity_tool = self.getActivityTool()
+ check_result_dict = {}
+ def runAndCheck():
+ check_result_dict.clear()
+ get_transaction().commit()
+ self.assertEqual(len(check_result_dict), 0)
+ self.tic()
+ self.assertEqual(len(check_result_dict), 2)
+ self.assertTrue(check_result_dict['before_ran'])
+ self.assertTrue(check_result_dict['after_ran'])
+ def mustRunBefore(self):
+ check_result_dict['before_ran'] = 'after_ran' not in check_result_dict
+ def mustRunAfter(self):
+ check_result_dict['after_ran'] = 'before_ran' in check_result_dict
+ Organisation.mustRunBefore = mustRunBefore
+ Organisation.mustRunAfter = mustRunAfter
+ try:
+ # Check that ordering looks good (SQLQueue first)
+ organisation.activate(activity='SQLQueue', priority=1).mustRunBefore()
+ organisation.activate(activity='SQLDict', priority=2).mustRunAfter()
+ runAndCheck()
+ # Check that ordering looks good (SQLDict first)
+ organisation.activate(activity='SQLDict', priority=1).mustRunBefore()
+ organisation.activate(activity='SQLQueue', priority=2).mustRunAfter()
+ runAndCheck()
+ # Check that tag takes precedence over priority (SQLQueue first by priority)
+ organisation.activate(activity='SQLQueue', priority=1, after_tag='a').mustRunAfter()
+ organisation.activate(activity='SQLDict', priority=2, tag='a').mustRunBefore()
+ runAndCheck()
+ # Check that tag takes precedence over priority (SQLDict first by priority)
+ organisation.activate(activity='SQLDict', priority=1, after_tag='a').mustRunAfter()
+ organisation.activate(activity='SQLQueue', priority=2, tag='a').mustRunBefore()
+ runAndCheck()
+ finally:
+ delattr(Organisation, 'mustRunBefore')
+ delattr(Organisation, 'mustRunAfter')
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestCMFActivity))
More information about the Erp5-report
mailing list