[Erp5-report] r34632 jm - in /erp5/trunk/products/CMFActivity: Activity/ tests/
nobody at svn.erp5.org
nobody at svn.erp5.org
Sat Apr 17 00:28:09 CEST 2010
Author: jm
Date: Sat Apr 17 00:28:09 2010
New Revision: 34632
URL: http://svn.erp5.org?rev=34632&view=rev
Log:
Fully respect priorities and dates when distributing activities
- When deleting duplicate messages, keep the one the highest score (priority,
date, uid).
- When several messages have the same serialization_tag, always validate first
the one with the highest score.
Modified:
erp5/trunk/products/CMFActivity/Activity/SQLDict.py
erp5/trunk/products/CMFActivity/tests/testCMFActivity.py
Modified: erp5/trunk/products/CMFActivity/Activity/SQLDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLDict.py?rev=34632&r1=34631&r2=34632&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLDict.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLDict.py [utf8] Sat Apr 17 00:28:09 2010
@@ -507,62 +507,61 @@
validation_text_dict = {'none': 1}
message_dict = {}
for line in result:
- message = self.loadMessage(line.message, uid = line.uid,
- order_validation_text = line.order_validation_text)
+ message = self.loadMessage(line.message, uid=line.uid, line=line,
+ order_validation_text=line.order_validation_text)
self.getExecutableMessageList(activity_tool, message, message_dict,
validation_text_dict, now_date=now_date)
if message_dict:
- message_unique_set = set()
+ def sort_message_key(message):
+ # same sort key as in SQLDict_readMessageList
+ return message.line.priority, message.line.date, message.uid
+ message_unique_dict = {}
+ serialization_tag_dict = {}
+ distributable_uid_set = set()
deletable_uid_list = []
+
# remove duplicates
# SQLDict considers object_path, method_id, tag to unify activities,
# but ignores method arguments. They are outside of semantics.
- for key in message_dict.keys():
- # we manipulate message_dict below so that we cannot use
- # iterator here.
- message = message_dict[key]
- unique_key = self.generateMessageUID(message)
- if unique_key in message_unique_set:
- deletable_uid_list.append(message.uid)
- del message_dict[message.uid]
- else:
- message_unique_set.add(unique_key)
+ for message in message_dict.itervalues():
+ message_unique_dict.setdefault(self.generateMessageUID(message),
+ []).append(message)
+ for message_list in message_unique_dict.itervalues():
+ if len(message_list) > 1:
+ # Sort list of duplicates to keep the message with highest score
+ message_list.sort(key=sort_message_key)
+ deletable_uid_list += [m.uid for m in message_list[1:]]
+ message = message_list[0]
+ distributable_uid_set.add(message.uid)
+ serialization_tag = message.activity_kw.get('serialization_tag')
+ if serialization_tag is not None:
+ serialization_tag_dict.setdefault(serialization_tag,
+ []).append(message)
# Don't let through if there is the same serialization tag in the
# message dict. If there is the same serialization tag, only one can
# be validated and others must wait.
# But messages with group_method_id are exceptions. serialization_tag
# does not stop validating together. Because those messages should
# be processed together at once.
- serialization_tag_set = set()
- serialization_tag_group_method_id_dict = {}
- for key in message_dict.keys():
- message = message_dict[key]
- # serialize messages with serialization_tag.
- serialization_tag = message.activity_kw.get('serialization_tag')
- group_method_id = message.activity_kw.get('group_method_id')
- if serialization_tag is not None:
- if serialization_tag in serialization_tag_set:
- if group_method_id is not None:
- # Only one group_method_id can pass through.
- if serialization_tag_group_method_id_dict.get(
- serialization_tag,None) != group_method_id:
- del message_dict[message.uid]
- else:
- del message_dict[message.uid]
- else:
- serialization_tag_set.add(serialization_tag)
- if group_method_id is not None:
- serialization_tag_group_method_id_dict[serialization_tag] = group_method_id
+ for message_list in serialization_tag_dict.itervalues():
+ if len(message_list) == 1:
+ continue
+ # Sort list of messages to validate the message with highest score
+ message_list.sort(key=sort_message_key)
+ group_method_id = message_list[0].activity_kw.get('group_method_id')
+ for message in message_list[1:]:
+ if group_method_id is None or \
+ group_method_id != message.activity_kw.get('group_method_id'):
+ distributable_uid_set.remove(message.uid)
if deletable_uid_list:
activity_tool.SQLBase_delMessage(table=self.sql_table,
uid=deletable_uid_list)
-
- distributable_count = len(message_dict)
- if distributable_count:
- activity_tool.SQLBase_assignMessage(table=self.sql_table,
- processing_node=0, uid=[m.uid for m in message_dict.itervalues()])
- validated_count += distributable_count
+ distributable_count = len(distributable_uid_set)
+ if distributable_count:
+ activity_tool.SQLBase_assignMessage(table=self.sql_table,
+ processing_node=0, uid=tuple(distributable_uid_set))
+ validated_count += distributable_count
if validated_count < MAX_VALIDATED_LIMIT:
offset += READ_MESSAGE_LIMIT
result = readMessageList(path=None, method_id=None, processing_node=-1,
Modified: erp5/trunk/products/CMFActivity/tests/testCMFActivity.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/tests/testCMFActivity.py?rev=34632&r1=34631&r2=34632&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/tests/testCMFActivity.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/tests/testCMFActivity.py [utf8] Sat Apr 17 00:28:09 2010
@@ -2720,14 +2720,16 @@
try:
Organisation.checkActivityCount = checkActivityCount
# Adds two same activities.
- organisation.activate(activity='SQLDict', tag='a').checkActivityCount(other_tag='a')
- get_transaction().commit()
- organisation.activate(activity='SQLDict', tag='a').checkActivityCount(other_tag='a')
+ organisation.activate(activity='SQLDict', tag='a', priority=2).checkActivityCount(other_tag='a')
+ get_transaction().commit()
+ uid1, = [x.uid for x in activity_tool.getMessageList()]
+ organisation.activate(activity='SQLDict', tag='a', priority=1).checkActivityCount(other_tag='a')
get_transaction().commit()
self.assertEqual(len(activity_tool.getMessageList()), 2)
activity_tool.distribute()
# After distribute, duplicate is deleted.
- self.assertEqual(len(activity_tool.getMessageList()), 1)
+ uid2, = [x.uid for x in activity_tool.getMessageList()]
+ self.assertNotEqual(uid1, uid2)
self.tic()
self.assertEqual(len(activity_tool.getMessageList()), 0)
self.assertEqual(len(check_result_dict), 1)
@@ -2886,9 +2888,9 @@
self.assertEqual(len(result), 0)
# Second scenario: activate, activate, distribute
# Both messages must be distributed (this is different from regular tags)
- organisation.activate(activity=activity, serialization_tag='1').getTitle()
+ organisation.activate(activity=activity, serialization_tag='1', priority=2).getTitle()
# Use a different method just so that SQLDict doesn't merge both activities prior to insertion.
- organisation.activate(activity=activity, serialization_tag='1').getId()
+ organisation.activate(activity=activity, serialization_tag='1', priority=1).getId()
get_transaction().commit()
result = activity_tool.getMessageList()
self.assertEqual(len(result), 2)
@@ -2899,9 +2901,11 @@
# If activity is SQLQueue, this does not happen.
if activity=='SQLDict':
# one is validated.
- self.assertEqual(len([x for x in result if x.processing_node == 0]), 1)
+ message, = [x for x in result if x.processing_node == 0]
+ self.assertEqual(message.method_id, 'getId')
# the other one is still waiting for validation.
- self.assertEqual(len([x for x in result if x.processing_node == -1]), 1)
+ message, = [x for x in result if x.processing_node == -1]
+ self.assertEqual(message.method_id, 'getTitle')
else:
# both are validated at once.
self.assertEqual(len([x for x in result if x.processing_node == 0]), 2)
More information about the Erp5-report
mailing list