[Erp5-report] r28706 - in /erp5/trunk/products/CMFActivity: Activity/ tests/

nobody at svn.erp5.org nobody at svn.erp5.org
Tue Sep 1 08:04:48 CEST 2009


Author: yusei
Date: Tue Sep  1 08:04:48 2009
New Revision: 28706

URL: http://svn.erp5.org?rev=28706&view=rev
Log:
Fix a bug. serialization tag guarantees that tagged messages are processed serially and not parallelly.

Modified:
    erp5/trunk/products/CMFActivity/Activity/SQLDict.py
    erp5/trunk/products/CMFActivity/tests/testCMFActivity.py

Modified: erp5/trunk/products/CMFActivity/Activity/SQLDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLDict.py?rev=28706&r1=28705&r2=28706&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLDict.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLDict.py [utf8] Tue Sep  1 08:04:48 2009
@@ -612,6 +612,7 @@
       now_date = self.getNow(activity_tool)
       result = readMessageList(path=None, method_id=None, processing_node=-1,
                                to_date=now_date, include_processing=0, offset=offset, count=READ_MESSAGE_LIMIT)
+
       validated_count = 0
       while len(result) and validated_count < MAX_VALIDATED_LIMIT:
         get_transaction().commit()
@@ -623,6 +624,51 @@
                                      order_validation_text = line.order_validation_text)
           self.getExecutableMessageList(activity_tool, message, message_dict,
                                         validation_text_dict, now_date=now_date)
+
+        if message_dict:
+          message_unique_set = set()
+          deletable_uid_list = []
+          # remove duplicates
+          # SQLDict considers object_path, method_id, tag to unify activities,
+          # but ignores method arguments. They are outside of semantics.
+          for key in message_dict.keys():
+            message = message_dict[key]
+            unique_key = (tuple(message.object_path), message.method_id,
+                          message.activity_kw.get('tag'),
+                          message.activity_kw.get('group_id'),
+                          )
+            if unique_key in message_unique_set:
+              deletable_uid_list.append(message.uid)
+              del message_dict[message.uid]
+            else:
+              message_unique_set.add(unique_key)
+          # don't let through if there is the same serialization tag in the
+          # message dict. if there is the same serialization tag, only one can
+          # be validated and others must wait.
+          # but messages with group_method_id are exceptions. serialization_tag
+          # does not stop simultaneous validation. because such messages will
+          # be processed together at once.
+          serialization_tag_set = set()
+          serialization_tag_group_method_id_dict = {}
+          for key in message_dict.keys():
+            message = message_dict[key]
+            # serialize messages with serialization_tag.
+            serialization_tag = message.activity_kw.get('serialization_tag')
+            group_method_id = message.activity_kw.get('group_method_id')
+            if serialization_tag is not None:
+              if serialization_tag in serialization_tag_set:
+                if group_method_id is not None:
+                  if serialization_tag_group_method_id_dict[serialization_tag]!=group_method_id:
+                    del message_dict[message.uid]
+                else:
+                  del message_dict[message.uid]
+              else:
+                serialization_tag_set.add(serialization_tag)
+                if group_method_id is not None:
+                  serialization_tag_group_method_id_dict[serialization_tag] = group_method_id
+          if deletable_uid_list:
+            activity_tool.SQLDict_delMessage(uid=deletable_uid_list)
+
         distributable_count = len(message_dict)
         if distributable_count:
           activity_tool.SQLDict_assignMessage(processing_node=0, uid=[message.uid for message in message_dict.itervalues()])

Modified: erp5/trunk/products/CMFActivity/tests/testCMFActivity.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/tests/testCMFActivity.py?rev=28706&r1=28705&r2=28706&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/tests/testCMFActivity.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/tests/testCMFActivity.py [utf8] Tue Sep  1 08:04:48 2009
@@ -2819,7 +2819,13 @@
     self.assertEqual(len(result), 2)
     activity_tool.distribute()
     result = activity_tool.getMessageList()
-    self.assertEqual(len([x for x in result if x.processing_node == 0]), 2)
+    # If activity is SQLDict, serialization tag prevents validating the same
+    # serialization tagged messages simultaneously.
+    # If activity is SQLQueue, this does not happen.
+    if activity=='SQLDict':
+      self.assertEqual(len([x for x in result if x.processing_node == 0]), 1)
+    else:
+      self.assertEqual(len([x for x in result if x.processing_node == 0]), 2)
     self.tic()
     result = activity_tool.getMessageList()
     self.assertEqual(len(result), 0)




More information about the Erp5-report mailing list