[Erp5-report] r37687 jm - in /erp5/trunk/products/CMFActivity: Activity/ tests/
nobody at svn.erp5.org
nobody at svn.erp5.org
Wed Aug 11 10:36:39 CEST 2010
Author: jm
Date: Wed Aug 11 10:36:38 2010
New Revision: 37687
URL: http://svn.erp5.org?rev=37687&view=rev
Log:
CMFActivity: make SQLQueue behave like SQLDict for distribution of serialized grouped messages
Note 'distribute' method is not merged into SQLBase, for 2 reasons:
- SQLQueue still differs from SQLDict because it does not remove duplicate
messages.
- 'order_validation_text' column only exists in 'message' table
Modified:
erp5/trunk/products/CMFActivity/Activity/SQLDict.py
erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
erp5/trunk/products/CMFActivity/tests/testCMFActivity.py
Modified: erp5/trunk/products/CMFActivity/Activity/SQLDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLDict.py?rev=37687&r1=37686&r2=37687&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLDict.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLDict.py [utf8] Wed Aug 11 10:36:38 2010
@@ -300,9 +300,10 @@ class SQLDict(RAMDict, SQLBase):
message_list.sort(key=sort_message_key)
deletable_uid_list += [m.uid for m in message_list[1:]]
message = message_list[0]
- distributable_uid_set.add(message.uid)
serialization_tag = message.activity_kw.get('serialization_tag')
- if serialization_tag is not None:
+ if serialization_tag is None:
+ distributable_uid_set.add(message.uid)
+ else:
serialization_tag_dict.setdefault(serialization_tag,
[]).append(message)
# Don't let through if there is the same serialization tag in the
@@ -312,15 +313,15 @@ class SQLDict(RAMDict, SQLBase):
# does not stop validating together. Because those messages should
# be processed together at once.
for message_list in serialization_tag_dict.itervalues():
- if len(message_list) == 1:
- continue
# Sort list of messages to validate the message with highest score
message_list.sort(key=sort_message_key)
+ distributable_uid_set.add(message_list[0].uid)
group_method_id = message_list[0].activity_kw.get('group_method_id')
+ if group_method_id is None:
+ continue
for message in message_list[1:]:
- if group_method_id is None or \
- group_method_id != message.activity_kw.get('group_method_id'):
- distributable_uid_set.remove(message.uid)
+ if group_method_id == message.activity_kw.get('group_method_id'):
+ distributable_uid_set.add(message.uid)
if deletable_uid_list:
activity_tool.SQLBase_delMessage(table=self.sql_table,
uid=deletable_uid_list)
Modified: erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLQueue.py?rev=37687&r1=37686&r2=37687&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLQueue.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLQueue.py [utf8] Wed Aug 11 10:36:38 2010
@@ -255,6 +255,12 @@ class SQLQueue(RAMQueue, SQLBase):
# Sort list of messages to validate the message with highest score
message_list.sort(key=sort_message_key)
distributable_uid_set.add(message_list[0].uid)
+ group_method_id = message_list[0].activity_kw.get('group_method_id')
+ if group_method_id is None:
+ continue
+ for message in message_list[1:]:
+ if group_method_id == message.activity_kw.get('group_method_id'):
+ distributable_uid_set.add(message.uid)
distributable_count = len(distributable_uid_set)
if distributable_count:
activity_tool.SQLBase_assignMessage(table=self.sql_table,
Modified: erp5/trunk/products/CMFActivity/tests/testCMFActivity.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/tests/testCMFActivity.py?rev=37687&r1=37686&r2=37687&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/tests/testCMFActivity.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/tests/testCMFActivity.py [utf8] Wed Aug 11 10:36:38 2010
@@ -3798,7 +3798,49 @@ class TestCMFActivity(ERP5TypeTestCase,
activity_tool.manageClearActivities(keep=0)
finally:
SQLQueue.MAX_MESSAGE_LIST_SIZE = old_MAX_MESSAGE_LIST_SIZE
-
+
+ def test_125_CheckDistributeWithSerializationTagAndGroupMethodId(self):
+ activity_tool = self.portal.portal_activities
+ obj1 = activity_tool.newActiveProcess()
+ obj2 = activity_tool.newActiveProcess()
+ transaction.commit()
+ self.tic()
+ group_method_call_list = []
+ def doSomething(self, message_list):
+ group_method_call_list.append(sorted((obj.getPath(), args, kw)
+ for obj, args, kw in message_list))
+ del message_list[:]
+ activity_tool.__class__.doSomething = doSomething
+ try:
+ for activity in 'SQLDict', 'SQLQueue':
+ activity_kw = dict(activity=activity, serialization_tag=self.id(),
+ group_method_id='portal_activities/doSomething')
+ obj1.activate(**activity_kw).dummy(1, x=None)
+ obj2.activate(**activity_kw).dummy(2, y=None)
+ transaction.commit()
+ activity_tool.distribute()
+ activity_tool.tic()
+ self.assertEqual(group_method_call_list.pop(),
+ sorted([(obj1.getPath(), (1,), dict(x=None)),
+ (obj2.getPath(), (2,), dict(y=None))]))
+ self.assertFalse(group_method_call_list)
+ self.assertFalse(activity_tool.getMessageList())
+ obj1.activate(priority=2, **activity_kw).dummy1(1, x=None)
+ obj1.activate(priority=1, **activity_kw).dummy2(2, y=None)
+ message1 = obj1.getPath(), (1,), dict(x=None)
+ message2 = obj1.getPath(), (2,), dict(y=None)
+ transaction.commit()
+ activity_tool.distribute()
+ self.assertEqual(len(activity_tool.getMessageList()), 2)
+ activity_tool.tic()
+ self.assertEqual(group_method_call_list.pop(),
+ dict(SQLDict=[message2],
+ SQLQueue=[message1, message2])[activity])
+ self.assertFalse(group_method_call_list)
+ self.assertFalse(activity_tool.getMessageList())
+ finally:
+ del activity_tool.__class__.doSomething
+
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestCMFActivity))
More information about the Erp5-report
mailing list