[Erp5-report] r28706 - in /erp5/trunk/products/CMFActivity: Activity/ tests/
nobody at svn.erp5.org
nobody at svn.erp5.org
Tue Sep 1 08:04:48 CEST 2009
Author: yusei
Date: Tue Sep 1 08:04:48 2009
New Revision: 28706
URL: http://svn.erp5.org?rev=28706&view=rev
Log:
Fix a bug. serialization tag guarantees that tagged messages are processed serially and not parallelly.
Modified:
erp5/trunk/products/CMFActivity/Activity/SQLDict.py
erp5/trunk/products/CMFActivity/tests/testCMFActivity.py
Modified: erp5/trunk/products/CMFActivity/Activity/SQLDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLDict.py?rev=28706&r1=28705&r2=28706&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLDict.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLDict.py [utf8] Tue Sep 1 08:04:48 2009
@@ -612,6 +612,7 @@
now_date = self.getNow(activity_tool)
result = readMessageList(path=None, method_id=None, processing_node=-1,
to_date=now_date, include_processing=0, offset=offset, count=READ_MESSAGE_LIMIT)
+
validated_count = 0
while len(result) and validated_count < MAX_VALIDATED_LIMIT:
get_transaction().commit()
@@ -623,6 +624,51 @@
order_validation_text = line.order_validation_text)
self.getExecutableMessageList(activity_tool, message, message_dict,
validation_text_dict, now_date=now_date)
+
+ if message_dict:
+ message_unique_set = set()
+ deletable_uid_list = []
+ # remove duplicates
+ # SQLDict considers object_path, method_id, tag to unify activities,
+ # but ignores method arguments. They are outside of semantics.
+ for key in message_dict.keys():
+ message = message_dict[key]
+ unique_key = (tuple(message.object_path), message.method_id,
+ message.activity_kw.get('tag'),
+ message.activity_kw.get('group_id'),
+ )
+ if unique_key in message_unique_set:
+ deletable_uid_list.append(message.uid)
+ del message_dict[message.uid]
+ else:
+ message_unique_set.add(unique_key)
+ # don't let through if there is the same serialization tag in the
+ # message dict. if there is the same serialization tag, only one can
+ # be validated and others must wait.
+ # but messages with group_method_id are exceptions. serialization_tag
+ # does not stop simultaneous validation. because such messages will
+ # be processed together at once.
+ serialization_tag_set = set()
+ serialization_tag_group_method_id_dict = {}
+ for key in message_dict.keys():
+ message = message_dict[key]
+ # serialize messages with serialization_tag.
+ serialization_tag = message.activity_kw.get('serialization_tag')
+ group_method_id = message.activity_kw.get('group_method_id')
+ if serialization_tag is not None:
+ if serialization_tag in serialization_tag_set:
+ if group_method_id is not None:
+ if serialization_tag_group_method_id_dict[serialization_tag]!=group_method_id:
+ del message_dict[message.uid]
+ else:
+ del message_dict[message.uid]
+ else:
+ serialization_tag_set.add(serialization_tag)
+ if group_method_id is not None:
+ serialization_tag_group_method_id_dict[serialization_tag] = group_method_id
+ if deletable_uid_list:
+ activity_tool.SQLDict_delMessage(uid=deletable_uid_list)
+
distributable_count = len(message_dict)
if distributable_count:
activity_tool.SQLDict_assignMessage(processing_node=0, uid=[message.uid for message in message_dict.itervalues()])
Modified: erp5/trunk/products/CMFActivity/tests/testCMFActivity.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/tests/testCMFActivity.py?rev=28706&r1=28705&r2=28706&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/tests/testCMFActivity.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/tests/testCMFActivity.py [utf8] Tue Sep 1 08:04:48 2009
@@ -2819,7 +2819,13 @@
self.assertEqual(len(result), 2)
activity_tool.distribute()
result = activity_tool.getMessageList()
- self.assertEqual(len([x for x in result if x.processing_node == 0]), 2)
+ # If activity is SQLDict, serialization tag prevents validating the same
+ # serialization tagged messages simultaneously.
+ # If activity is SQLQueue, this does not happen.
+ if activity=='SQLDict':
+ self.assertEqual(len([x for x in result if x.processing_node == 0]), 1)
+ else:
+ self.assertEqual(len([x for x in result if x.processing_node == 0]), 2)
self.tic()
result = activity_tool.getMessageList()
self.assertEqual(len(result), 0)
More information about the Erp5-report
mailing list