[Erp5-report] r19192 - in /erp5/trunk/products/CMFActivity: Activity/ skins/activity/

nobody at svn.erp5.org nobody at svn.erp5.org
Fri Feb 8 18:28:50 CET 2008


Author: vincent
Date: Fri Feb  8 18:28:50 2008
New Revision: 19192

URL: http://svn.erp5.org?rev=19192&view=rev
Log:
Instead of deleting duplicate messages, reserve them, and delete them when the "original" message has succeeded.

Added:
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_commit.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveDuplicatedLineList.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectDuplicatedLineList.zsql
Removed:
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_deleteDuplicatedMessageList.zsql
Modified:
    erp5/trunk/products/CMFActivity/Activity/SQLDict.py

Modified: erp5/trunk/products/CMFActivity/Activity/SQLDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLDict.py?rev=19192&r1=19191&r2=19192&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLDict.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLDict.py Fri Feb  8 18:28:50 2008
@@ -178,20 +178,35 @@
     if len(uid_list):
       activity_tool.SQLDict_makeMessageListAvailable(uid_list=uid_list)
 
-  def deleteDuplicatedLineList(self, activity_tool, date, processing_node, line):
-    """
-      Delete all messages matching given one except itself.
-      Operator  Value
-      !=        uid
-      <=        date
-      =         path, method_id, group_method_id, order_validation_text,
-                processing_node, tag
-    """
-    activity_tool.SQLDict_deleteDuplicatedMessageList(
-      processing_node=processing_node, uid=line.uid,
-      path=line.path, method_id=line.method_id,
-      group_method_id=line.group_method_id,
-      order_validation_text=line.order_validation_text)
+  def getDuplicateMessageUidList(self, activity_tool, line, processing_node):
+    """
+      Reserve unreserved messages matching given line.
+      Return their uids.
+    """
+    try:
+      result = activity_tool.SQLDict_selectDuplicatedLineList(
+        path=line.path,
+        method_id=line.method_id,
+        group_method_id=line.group_method_id,
+        order_validation_text=line.order_validation_text
+      )
+      uid_list = [x.uid for x in result]
+      if len(uid_list):
+        activity_tool.SQLDict_reserveDuplicatedLineList(
+          processing_node=processing_node,
+          uid_list=uid_list
+        )
+      else:
+        # Release locks
+        activity_tool.SQLDict_commit()
+    except:
+      # Log
+      LOG('SQLDict', WARNING, 'getDuplicateMessageUidList got an exception', error=sys.exc_info())
+      # Release lock
+      activity_tool.SQLDict_rollback()
+      # And re-raise
+      raise
+    return uid_list
 
   def getProcessableMessageList(self, activity_tool, processing_node):
     """
@@ -217,13 +232,14 @@
       unclean state.
 
       Returned values:
-        3-tuple:
+        4-tuple:
           - list of 3-tuple:
             - message uid
             - message
             - priority
           - impacted object count
           - group_method_id
+          - uid_to_duplicate_uid_list_dict
     """
     def getReservedMessageList(**kw):
       line_list = self.getReservedMessageList(activity_tool=activity_tool,
@@ -233,9 +249,12 @@
       if len(line_list):
         LOG('SQLDict', TRACE, 'Reserved messages: %r' % ([x.uid for x in line_list]))
       return line_list
-    def deleteDuplicatedLineList(line):
-      self.deleteDuplicatedLineList(activity_tool=activity_tool, date=now_date,
-                                 processing_node=processing_node, line=line)
+    def getDuplicateMessageUidList(line):
+      uid_list = self.getDuplicateMessageUidList(activity_tool=activity_tool, 
+        line=line, processing_node=processing_node)
+      if len(uid_list):
+        LOG('SQLDict', TRACE, 'Reserved duplicate messages: %r' % (uid_list, ))
+      return uid_list
     def makeMessageListAvailable(uid_list):
       self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
     BUNDLE_MESSAGE_COUNT = 100 # Arbitrary number
@@ -248,14 +267,14 @@
     group_method_id = None
     try:
       result = getReservedMessageList(limit=1)
+      uid_to_duplicate_uid_list_dict = {}
       if len(result) > 0:
         line = result[0]
         m = self.loadMessage(line.message, uid=line.uid)
         append(line, m)
         group_method_id = line.group_method_id
-        # Delete all messages matching current one - except current one.
-        deleteDuplicatedLineList(line)
         activity_tool.SQLDict_processMessage(uid=[line.uid])
+        uid_to_duplicate_uid_list_dict.setdefault(line.uid, []).extend(getDuplicateMessageUidList(line))
         if group_method_id not in (None, '', '\0'):
           # Count the number of objects to prevent too many objects.
           count += len(m.getObjectList(activity_tool))
@@ -272,11 +291,12 @@
               # So what remains to be filtered on are path, method_id,
               # order_validation_text, tag
               key = (line.path, line.method_id, line.order_validation_text, line.tag)
-              if key in path_and_method_id_dict:
-                LOG('SQLDict', TRACE, 'Duplicate of message %r has been skipped (it should already be deleted anyway): %r' % (path_and_method_id_dict[key], line.uid))
+              original_uid = path_and_method_id_dict.get(key)
+              if original_uid is not None:
+                uid_to_duplicate_uid_list_dict.setdefault(original_uid, []).append(line.uid)
                 continue
               path_and_method_id_dict[key] = line.uid
-              deleteDuplicatedLineList(line)
+              uid_to_duplicate_uid_list_dict.setdefault(line.uid, []).extend(getDuplicateMessageUidList(line))
               if count < MAX_GROUPED_OBJECTS:
                 m = self.loadMessage(line.message, uid=line.uid)
                 count += len(m.getObjectList(activity_tool))
@@ -286,7 +306,7 @@
             activity_tool.SQLDict_processMessage(uid=[x[0] for x in message_list])
             # Unreserve extra messages as soon as possible.
             makeMessageListAvailable(unreserve_uid_list)
-      return message_list, count, group_method_id
+      return message_list, count, group_method_id, uid_to_duplicate_uid_list_dict
     except:
       LOG('SQLDict', WARNING, 'Exception while reserving messages.', error=sys.exc_info())
       if len(message_list):
@@ -300,9 +320,9 @@
             LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
       else:
         LOG('SQLDict', TRACE, '(no message was reserved)')
-      return [], 0, None
-
-  def finalizeMessageExecution(self, activity_tool, message_uid_priority_list):
+      return [], 0, None, {}
+
+  def finalizeMessageExecution(self, activity_tool, message_uid_priority_list, uid_to_duplicate_uid_list_dict):
     """
       If everything was fine, delete all messages.
       If anything failed, make successfull messages available (if any), and
@@ -326,11 +346,18 @@
       if m.is_executed:
         if something_failed:
           make_available_uid_list.append(uid)
+          make_available_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
         else:
           deletable_uid_list.append(uid)
+          deletable_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
           if m.active_process:
+            # XXX: Bug here: Even if a duplicate message has an active_process,
+            # it won't be called on the duplicate.
             message_with_active_process_list.append(m)
       else:
+        # Whatever happens, duplicate uids are to be made available. Only
+        # executed message will get to lower priority or error state.
+        make_available_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
         exc_type = m.exc_info[0]
         if type(exc_type) is ClassType and \
            issubclass(exc_type, ConflictError):
@@ -388,9 +415,14 @@
 
   # Queue semantic
   def dequeueMessage(self, activity_tool, processing_node):
-    def makeMessageListAvailable(uid_list):
-      self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
-    message_uid_priority_list, count, group_method_id = \
+    def makeMessageListAvailable(uid_list, uid_to_duplicate_uid_list_dict):
+      final_uid_list = []
+      for uid in uid_list:
+        final_uid_list.append(uid)
+        final_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
+      self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=final_uid_list)
+    message_uid_priority_list, count, group_method_id, \
+      uid_to_duplicate_uid_list_dict = \
       self.getProcessableMessageList(activity_tool, processing_node)
     if len(message_uid_priority_list):
       # Remove group_id parameter from group_method_id
@@ -425,7 +457,7 @@
           raise
         to_free_uid_list = [x[0] for x in message_uid_priority_list]
         try:
-          makeMessageListAvailable(to_free_uid_list)
+          makeMessageListAvailable(to_free_uid_list, uid_to_duplicate_uid_list_dict)
         except:
           LOG('SQLDict', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
         else:
@@ -451,12 +483,12 @@
           x[1].is_executed = 0
         failed_message_uid_list = [x[0] for x in message_uid_priority_list]
         try:
-          makeMessageListAvailable(failed_message_uid_list)
+          makeMessageListAvailable(failed_message_uid_list, uid_to_duplicate_uid_list_dict)
         except:
           LOG('SQLDict', ERROR, 'Failed to free remaining messages: %r' % (failed_message_uid_list, ), error=sys.exc_info())
         else:
           LOG('SQLDict', TRACE, 'Freed messages %r' % (failed_message_uid_list, ))
-      self.finalizeMessageExecution(activity_tool, message_uid_priority_list)
+      self.finalizeMessageExecution(activity_tool, message_uid_priority_list, uid_to_duplicate_uid_list_dict)
     get_transaction().commit()
     return not len(message_uid_priority_list)
 

Added: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_commit.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_commit.zsql?rev=19192&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_commit.zsql (added)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_commit.zsql Fri Feb  8 18:28:50 2008
@@ -1,0 +1,11 @@
+<dtml-comment>
+title:
+connection_id:cmf_activity_sql_connection
+max_rows:1000
+max_cache:0
+cache_time:0
+class_name:
+class_file:
+</dtml-comment>
+<params></params>
+COMMIT

Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_deleteDuplicatedMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_deleteDuplicatedMessageList.zsql?rev=19191&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_deleteDuplicatedMessageList.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_deleteDuplicatedMessageList.zsql (removed)
@@ -1,28 +1,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:0
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>
-processing_node
-uid
-path
-method_id
-group_method_id
-order_validation_text
-</params>
-DELETE FROM
-  message
-WHERE
-  processing_node IN (0, <dtml-sqlvar processing_node type="int">)
-  AND uid != <dtml-sqlvar uid type="int">
-  AND path = <dtml-sqlvar path type="string">
-  AND method_id = <dtml-sqlvar method_id type="string">
-  AND group_method_id = <dtml-sqlvar group_method_id type="string">
-  AND order_validation_text = <dtml-sqlvar order_validation_text type="string">
-<dtml-var sql_delimiter>
-COMMIT

Added: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveDuplicatedLineList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveDuplicatedLineList.zsql?rev=19192&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveDuplicatedLineList.zsql (added)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveDuplicatedLineList.zsql Fri Feb  8 18:28:50 2008
@@ -1,0 +1,21 @@
+<dtml-comment>
+title:
+connection_id:cmf_activity_sql_connection
+max_rows:0
+max_cache:0
+cache_time:0
+class_name:
+class_file:
+</dtml-comment>
+<params>
+processing_node
+uid_list
+</params>
+UPDATE
+  message
+SET
+  processing_node=<dtml-sqlvar processing_node type="int">
+WHERE
+  uid IN (<dtml-in uid_list><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>, </dtml-if></dtml-in>)
+<dtml-var sql_delimiter>
+COMMIT

Added: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectDuplicatedLineList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectDuplicatedLineList.zsql?rev=19192&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectDuplicatedLineList.zsql (added)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectDuplicatedLineList.zsql Fri Feb  8 18:28:50 2008
@@ -1,0 +1,24 @@
+<dtml-comment>
+title:
+connection_id:cmf_activity_sql_connection
+max_rows:0
+max_cache:0
+cache_time:0
+class_name:
+class_
+</dtml-comment>
+<params>
+path
+method_id
+group_method_id
+order_validation_text
+</params>
+SELECT uid FROM
+  message
+WHERE
+  processing_node = 0
+  AND path = <dtml-sqlvar path type="string">
+  AND method_id = <dtml-sqlvar method_id type="string">
+  AND group_method_id = <dtml-sqlvar group_method_id type="string">
+  AND order_validation_text = <dtml-sqlvar order_validation_text type="string">
+FOR UPDATE




More information about the Erp5-report mailing list