[Erp5-report] r34632 jm - in /erp5/trunk/products/CMFActivity: Activity/ tests/

nobody at svn.erp5.org nobody at svn.erp5.org
Sat Apr 17 00:28:09 CEST 2010


Author: jm
Date: Sat Apr 17 00:28:09 2010
New Revision: 34632

URL: http://svn.erp5.org?rev=34632&view=rev
Log:
Fully respect priorities and dates when distributing activities

- When deleting duplicate messages, keep the one the highest score (priority,
  date, uid).
- When several messages have the same serialization_tag, always validate first
  the one with the highest score.

Modified:
    erp5/trunk/products/CMFActivity/Activity/SQLDict.py
    erp5/trunk/products/CMFActivity/tests/testCMFActivity.py

Modified: erp5/trunk/products/CMFActivity/Activity/SQLDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLDict.py?rev=34632&r1=34631&r2=34632&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLDict.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLDict.py [utf8] Sat Apr 17 00:28:09 2010
@@ -507,62 +507,61 @@
         validation_text_dict = {'none': 1}
         message_dict = {}
         for line in result:
-          message = self.loadMessage(line.message, uid = line.uid,
-                                     order_validation_text = line.order_validation_text)
+          message = self.loadMessage(line.message, uid=line.uid, line=line,
+            order_validation_text=line.order_validation_text)
           self.getExecutableMessageList(activity_tool, message, message_dict,
                                         validation_text_dict, now_date=now_date)
 
         if message_dict:
-          message_unique_set = set()
+          def sort_message_key(message):
+            # same sort key as in SQLDict_readMessageList
+            return message.line.priority, message.line.date, message.uid
+          message_unique_dict = {}
+          serialization_tag_dict = {}
+          distributable_uid_set = set()
           deletable_uid_list = []
+
           # remove duplicates
           # SQLDict considers object_path, method_id, tag to unify activities,
           # but ignores method arguments. They are outside of semantics.
-          for key in message_dict.keys():
-            # we manipulate message_dict below so that we cannot use
-            # iterator here.
-            message = message_dict[key]
-            unique_key = self.generateMessageUID(message)
-            if unique_key in message_unique_set:
-              deletable_uid_list.append(message.uid)
-              del message_dict[message.uid]
-            else:
-              message_unique_set.add(unique_key)
+          for message in message_dict.itervalues():
+            message_unique_dict.setdefault(self.generateMessageUID(message),
+                                           []).append(message)
+          for message_list in message_unique_dict.itervalues():
+            if len(message_list) > 1:
+              # Sort list of duplicates to keep the message with highest score
+              message_list.sort(key=sort_message_key)
+              deletable_uid_list += [m.uid for m in message_list[1:]]
+            message = message_list[0]
+            distributable_uid_set.add(message.uid)
+            serialization_tag = message.activity_kw.get('serialization_tag')
+            if serialization_tag is not None:
+              serialization_tag_dict.setdefault(serialization_tag,
+                                                []).append(message)
           # Don't let through if there is the same serialization tag in the
           # message dict. If there is the same serialization tag, only one can
           # be validated and others must wait.
           # But messages with group_method_id are exceptions. serialization_tag
           # does not stop validating together. Because those messages should
           # be processed together at once.
-          serialization_tag_set = set()
-          serialization_tag_group_method_id_dict = {}
-          for key in message_dict.keys():
-            message = message_dict[key]
-            # serialize messages with serialization_tag.
-            serialization_tag = message.activity_kw.get('serialization_tag')
-            group_method_id = message.activity_kw.get('group_method_id')
-            if serialization_tag is not None:
-              if serialization_tag in serialization_tag_set:
-                if group_method_id is not None:
-                  # Only one group_method_id can pass through.
-                  if serialization_tag_group_method_id_dict.get(
-                      serialization_tag,None) != group_method_id:
-                    del message_dict[message.uid]
-                else:
-                  del message_dict[message.uid]
-              else:
-                serialization_tag_set.add(serialization_tag)
-                if group_method_id is not None:
-                  serialization_tag_group_method_id_dict[serialization_tag] = group_method_id
+          for message_list in serialization_tag_dict.itervalues():
+            if len(message_list) == 1:
+              continue
+            # Sort list of messages to validate the message with highest score
+            message_list.sort(key=sort_message_key)
+            group_method_id = message_list[0].activity_kw.get('group_method_id')
+            for message in message_list[1:]:
+              if group_method_id is None or \
+                 group_method_id != message.activity_kw.get('group_method_id'):
+                distributable_uid_set.remove(message.uid)
           if deletable_uid_list:
             activity_tool.SQLBase_delMessage(table=self.sql_table,
                                              uid=deletable_uid_list)
-
-        distributable_count = len(message_dict)
-        if distributable_count:
-          activity_tool.SQLBase_assignMessage(table=self.sql_table,
-            processing_node=0, uid=[m.uid for m in message_dict.itervalues()])
-          validated_count += distributable_count
+          distributable_count = len(distributable_uid_set)
+          if distributable_count:
+            activity_tool.SQLBase_assignMessage(table=self.sql_table,
+              processing_node=0, uid=tuple(distributable_uid_set))
+            validated_count += distributable_count
         if validated_count < MAX_VALIDATED_LIMIT:
           offset += READ_MESSAGE_LIMIT
           result = readMessageList(path=None, method_id=None, processing_node=-1,

Modified: erp5/trunk/products/CMFActivity/tests/testCMFActivity.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/tests/testCMFActivity.py?rev=34632&r1=34631&r2=34632&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/tests/testCMFActivity.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/tests/testCMFActivity.py [utf8] Sat Apr 17 00:28:09 2010
@@ -2720,14 +2720,16 @@
     try:
       Organisation.checkActivityCount = checkActivityCount
       # Adds two same activities.
-      organisation.activate(activity='SQLDict', tag='a').checkActivityCount(other_tag='a')
-      get_transaction().commit()
-      organisation.activate(activity='SQLDict', tag='a').checkActivityCount(other_tag='a')
+      organisation.activate(activity='SQLDict', tag='a', priority=2).checkActivityCount(other_tag='a')
+      get_transaction().commit()
+      uid1, = [x.uid for x in activity_tool.getMessageList()]
+      organisation.activate(activity='SQLDict', tag='a', priority=1).checkActivityCount(other_tag='a')
       get_transaction().commit()
       self.assertEqual(len(activity_tool.getMessageList()), 2)
       activity_tool.distribute()
       # After distribute, duplicate is deleted.
-      self.assertEqual(len(activity_tool.getMessageList()), 1)
+      uid2, = [x.uid for x in activity_tool.getMessageList()]
+      self.assertNotEqual(uid1, uid2)
       self.tic()
       self.assertEqual(len(activity_tool.getMessageList()), 0)
       self.assertEqual(len(check_result_dict), 1)
@@ -2886,9 +2888,9 @@
     self.assertEqual(len(result), 0)
     # Second scenario: activate, activate, distribute
     # Both messages must be distributed (this is different from regular tags)
-    organisation.activate(activity=activity, serialization_tag='1').getTitle()
+    organisation.activate(activity=activity, serialization_tag='1', priority=2).getTitle()
     # Use a different method just so that SQLDict doesn't merge both activities prior to insertion.
-    organisation.activate(activity=activity, serialization_tag='1').getId()
+    organisation.activate(activity=activity, serialization_tag='1', priority=1).getId()
     get_transaction().commit()
     result = activity_tool.getMessageList()
     self.assertEqual(len(result), 2)
@@ -2899,9 +2901,11 @@
     # If activity is SQLQueue, this does not happen.
     if activity=='SQLDict':
       # one is validated.
-      self.assertEqual(len([x for x in result if x.processing_node == 0]), 1)
+      message, = [x for x in result if x.processing_node == 0]
+      self.assertEqual(message.method_id, 'getId')
       # the other one is still waiting for validation.
-      self.assertEqual(len([x for x in result if x.processing_node == -1]), 1)
+      message, = [x for x in result if x.processing_node == -1]
+      self.assertEqual(message.method_id, 'getTitle')
     else:
       # both are validated at once.
       self.assertEqual(len([x for x in result if x.processing_node == 0]), 2)




More information about the Erp5-report mailing list