[Erp5-report] r37686 jm - in /erp5/trunk/products/CMFActivity: ./ Activity/ skins/activity/...

nobody at svn.erp5.org nobody at svn.erp5.org
Wed Aug 11 10:35:15 CEST 2010


Author: jm
Date: Wed Aug 11 10:35:15 2010
New Revision: 37686

URL: http://svn.erp5.org?rev=37686&view=rev
Log:
CMFActivity: add message grouping support for processing SQLQueue

Added:
    erp5/trunk/products/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql
      - copied, changed from r37685, erp5/trunk/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLBase_processMessage.zsql
      - copied, changed from r37685, erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql
      - copied, changed from r37685, erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql
      - copied, changed from r37685, erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql
Removed:
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql
Modified:
    erp5/trunk/products/CMFActivity/Activity/SQLBase.py
    erp5/trunk/products/CMFActivity/Activity/SQLDict.py
    erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
    erp5/trunk/products/CMFActivity/ActivityTool.py
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql
    erp5/trunk/products/CMFActivity/tests/testCMFActivity.py

Modified: erp5/trunk/products/CMFActivity/Activity/SQLBase.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLBase.py?rev=37686&r1=37685&r2=37686&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLBase.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLBase.py [utf8] Wed Aug 11 10:35:15 2010
@@ -148,12 +148,15 @@ class SQLBase:
         This number is guaranted not to be exceeded.
         If None (or not given) no limit apply.
     """
-    result = not group_method_id and \
-      activity_tool.SQLDict_selectReservedMessageList(
-        processing_node=processing_node, count=limit)
+    select = activity_tool.SQLBase_selectReservedMessageList
+    result = not group_method_id and select(table=self.sql_table, count=limit,
+                                            processing_node=processing_node)
     if not result:
-      activity_tool.SQLDict_reserveMessageList(count=limit, processing_node=processing_node, to_date=date, group_method_id=group_method_id)
-      result = activity_tool.SQLDict_selectReservedMessageList(processing_node=processing_node, count=limit)
+      activity_tool.SQLBase_reserveMessageList(table=self.sql_table,
+        count=limit, processing_node=processing_node, to_date=date,
+        group_method_id=group_method_id)
+      result = select(table=self.sql_table,
+                      processing_node=processing_node, count=limit)
     return result
 
   def makeMessageListAvailable(self, activity_tool, uid_list):
@@ -161,7 +164,8 @@ class SQLBase:
       Put messages back in processing_node=0 .
     """
     if len(uid_list):
-      activity_tool.SQLDict_makeMessageListAvailable(uid_list=uid_list)
+      activity_tool.SQLBase_makeMessageListAvailable(table=self.sql_table,
+                                                     uid=uid_list)
 
   def getProcessableMessageList(self, activity_tool, processing_node):
     """
@@ -222,7 +226,7 @@ class SQLBase:
         m = self.loadMessage(line.message, uid=uid, line=line)
         message_list.append(m)
         group_method_id = line.group_method_id
-        activity_tool.SQLDict_processMessage(uid=[uid])
+        activity_tool.SQLBase_processMessage(table=self.sql_table, uid=[uid])
         uid_to_duplicate_uid_list_dict.setdefault(uid, []) \
           .extend(getDuplicateMessageUidList(line))
         if group_method_id not in (None, '', '\0'):
@@ -243,22 +247,26 @@ class SQLBase:
               # We read each line once so lines have distinct uids.
               # So what remains to be filtered on are path, method_id and
               # order_validation_text.
-              key = (line.path, line.method_id, line.order_validation_text)
-              original_uid = path_and_method_id_dict.get(key)
-              if original_uid is not None:
-                uid_to_duplicate_uid_list_dict.setdefault(original_uid, []) \
-                .append(line.uid)
-                continue
-              path_and_method_id_dict[key] = line.uid
-              uid_to_duplicate_uid_list_dict.setdefault(line.uid, []) \
-              .extend(getDuplicateMessageUidList(line))
+              try:
+                key = line.path, line.method_id, line.order_validation_text
+              except AttributeError:
+                pass # message_queue does not have 'order_validation_text'
+              else:
+                original_uid = path_and_method_id_dict.get(key)
+                if original_uid is not None:
+                  uid_to_duplicate_uid_list_dict.setdefault(original_uid, []) \
+                  .append(line.uid)
+                  continue
+                path_and_method_id_dict[key] = line.uid
+                uid_to_duplicate_uid_list_dict.setdefault(line.uid, []) \
+                .extend(getDuplicateMessageUidList(line))
               if count < MAX_GROUPED_OBJECTS:
                 m = self.loadMessage(line.message, uid=line.uid, line=line)
                 count += len(m.getObjectList(activity_tool))
                 message_list.append(m)
               else:
                 unreserve_uid_list.append(line.uid)
-            activity_tool.SQLDict_processMessage(
+            activity_tool.SQLBase_processMessage(table=self.sql_table,
               uid=[m.uid for m in message_list])
             # Unreserve extra messages as soon as possible.
             self.makeMessageListAvailable(activity_tool=activity_tool,
@@ -298,7 +306,8 @@ class SQLBase:
         group_method_id = group_method_id.split('\0')[0]
       if group_method_id not in (None, ""):
         method  = activity_tool.invokeGroup
-        args = (group_method_id, message_list)
+        args = (group_method_id, message_list, self.__class__.__name__,
+                self.merge_duplicate)
         activity_runtime_environment = ActivityRuntimeEnvironment(None)
       else:
         method = activity_tool.invoke

Modified: erp5/trunk/products/CMFActivity/Activity/SQLDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLDict.py?rev=37686&r1=37685&r2=37686&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLDict.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLDict.py [utf8] Wed Aug 11 10:35:15 2010
@@ -52,6 +52,7 @@ class SQLDict(RAMDict, SQLBase):
     because use of OOBTree.
   """
   sql_table = 'message'
+  merge_duplicate = True
 
   # Transaction commit methods
   def prepareQueueMessageList(self, activity_tool, message_list):

Modified: erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLQueue.py?rev=37686&r1=37685&r2=37686&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLQueue.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLQueue.py [utf8] Wed Aug 11 10:35:15 2010
@@ -29,15 +29,9 @@
 from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
 from RAMQueue import RAMQueue
 from Queue import VALID, INVALID_PATH
-from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
 from Products.CMFActivity.Errors import ActivityFlushError
 from ZODB.POSException import ConflictError
-from types import ClassType
-import sys
-from time import time
 from SQLBase import SQLBase, sort_message_key
-from Products.CMFActivity.ActivityRuntimeEnvironment import (
-  ActivityRuntimeEnvironment, getTransactionalVariable)
 from zExceptions import ExceptionFormatter
 
 import transaction
@@ -48,17 +42,6 @@ from zLOG import LOG, WARNING, ERROR, IN
 MAX_VALIDATED_LIMIT = 1000
 # Read this many messages to validate.
 READ_MESSAGE_LIMIT = 1000
-# Process this many messages in each dequeueMessage call.
-# Downside of setting to a "small" value: the cost of reserving a batch of
-# few messages increases relatively to the cost of executing activities,
-# making CMFActivity overhead significant.
-# Downside of setting to a "big" value: if there are many slow activities in
-# a multi-activity-node environment, multiple slow activities will be reserved
-# by a single node, making a suboptimal use of the parallelisation offered by
-# the cluster.
-# Before increasing this value, consider using SQLDict with group methods
-# first.
-MESSAGE_BUNDLE_SIZE = 1
 
 MAX_MESSAGE_LIST_SIZE = 100
 
@@ -69,6 +52,7 @@ class SQLQueue(RAMQueue, SQLBase):
     because use of OOBTree.
   """
   sql_table = 'message_queue'
+  merge_duplicate = False
 
   def prepareQueueMessageList(self, activity_tool, message_list):
     message_list = [m for m in message_list if m.is_registered]
@@ -83,6 +67,8 @@ class SQLQueue(RAMQueue, SQLBase):
       method_id_list = [m.method_id for m in registered_message_list]
       priority_list = [m.activity_kw.get('priority', 1) for m in registered_message_list]
       date_list = [m.activity_kw.get('at_date', None) for m in registered_message_list]
+      group_method_id_list = ['\0'.join([message.activity_kw.get('group_method_id', ''), message.activity_kw.get('group_id', '')])
+                              for message in registered_message_list]
       tag_list = [m.activity_kw.get('tag', '') for m in registered_message_list]
       serialization_tag_list = [m.activity_kw.get('serialization_tag', '') for m in registered_message_list]
       dumped_message_list = [self.dumpMessage(m) for m in registered_message_list]
@@ -92,6 +78,7 @@ class SQLQueue(RAMQueue, SQLBase):
                                               method_id_list=method_id_list,
                                               priority_list=priority_list,
                                               message_list=dumped_message_list,
+                                              group_method_id_list = group_method_id_list,
                                               date_list=date_list,
                                               tag_list=tag_list,
                                               processing_node_list=None,
@@ -110,162 +97,14 @@ class SQLQueue(RAMQueue, SQLBase):
     # Nothing to do in SQLQueue.
     pass
 
-  def getReservedMessageList(self, activity_tool, date, processing_node, limit=None):
+  def getDuplicateMessageUidList(self, activity_tool, line, processing_node):
     """
-      Get and reserve a list of messages.
-      limit
-        Maximum number of messages to fetch.
-        This number is not garanted to be reached, because of:
-         - not enough messages being pending execution
-         - race condition (other nodes reserving the same messages at the same
-           time)
-        This number is guaranted not to be exceeded.
-        If None (or not given) no limit apply.
+      Reserve unreserved messages matching given line.
+      Return their uids.
     """
-    result = activity_tool.SQLQueue_selectReservedMessageList(processing_node=processing_node, count=limit)
-    if len(result) == 0:
-      activity_tool.SQLQueue_reserveMessageList(count=limit, processing_node=processing_node, to_date=date)
-      result = activity_tool.SQLQueue_selectReservedMessageList(processing_node=processing_node, count=limit)
-    return result
-
-  def makeMessageListAvailable(self, activity_tool, uid_list):
-    """
-      Put messages back in processing_node=0 .
-    """
-    if len(uid_list):
-      activity_tool.SQLQueue_makeMessageListAvailable(uid_list=uid_list)
-
-  def getProcessableMessageList(self, activity_tool, processing_node):
-    """
-      Always true:
-        For each reserved message, delete redundant messages when it gets
-        reserved (definitely lost, but they are expandable since redundant).
-
-      - reserve a message
-      - set reserved message to processing=1 state
-      - if this message has a group_method_id:
-        - reserve a bunch of BUNDLE_MESSAGE_COUNT messages
-        - untill number of impacted objects goes over MAX_GROUPED_OBJECTS
-          - get one message from the reserved bunch (this messages will be
-            "needed")
-          - increase the number of impacted object
-        - set "needed" reserved messages to processing=1 state
-        - unreserve "unneeded" messages
-      - return still-reserved message list
-
-      If any error happens in above described process, try to unreserve all
-      messages already reserved in that process.
-      If it fails, complain loudly that some messages might still be in an
-      unclean state.
-
-      Returned values:
-        list of messages
-    """
-    def getReservedMessageList(limit):
-      line_list = self.getReservedMessageList(activity_tool=activity_tool,
-                                              date=now_date,
-                                              processing_node=processing_node,
-                                              limit=limit)
-      if len(line_list):
-        LOG('SQLQueue', TRACE, 'Reserved messages: %r' % ([x.uid for x in line_list]))
-      return line_list
-    def makeMessageListAvailable(uid_list):
-      self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
-    now_date = self.getNow(activity_tool)
-    message_list = []
-    try:
-      result = getReservedMessageList(limit=MESSAGE_BUNDLE_SIZE)
-      for line in result:
-        m = self.loadMessage(line.message, uid=line.uid, line=line)
-        message_list.append(m)
-      if len(message_list):
-        activity_tool.SQLQueue_processMessage(uid=[m.uid for x in message_list])
-      return message_list
-    except:
-      LOG('SQLQueue', WARNING, 'Exception while reserving messages.', error=sys.exc_info())
-      if len(message_list):
-        to_free_uid_list = [m.uid for m in message_list]
-        try:
-          makeMessageListAvailable(to_free_uid_list)
-        except:
-          LOG('SQLQueue', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
-        else:
-          if len(to_free_uid_list):
-            LOG('SQLQueue', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
-      else:
-        LOG('SQLQueue', TRACE, '(no message was reserved)')
-      return []
-
-  def dequeueMessage(self, activity_tool, processing_node):
-    def makeMessageListAvailable(uid_list):
-      self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
-    message_list = \
-      self.getProcessableMessageList(activity_tool, processing_node)
-    if message_list:
-      processing_stop_time = time() + 30 # Stop processing after more than 10 seconds were spent
-      processed_count = 0
-      # Commit right before executing messages.
-      # As MySQL transaction does not start exactly at the same time as ZODB
-      # transactions but a bit later, messages available might be called
-      # on objects which are not available - or available in an old
-      # version - to ZODB connector.
-      # So all connectors must be committed now that we have selected
-      # everything needed from MySQL to get a fresh view of ZODB objects.
-      transaction.commit()
-      tv = getTransactionalVariable(None)
-      for m in message_list:
-        tv['activity_runtime_environment'] = ActivityRuntimeEnvironment(m)
-        processed_count += 1
-        # Try to invoke
-        try:
-          activity_tool.invoke(m)
-          if m.getExecutionState() != MESSAGE_NOT_EXECUTED:
-            # Commit so that if a message raises it doesn't causes previous
-            # successfull messages to be rolled back. This commit might fail,
-            # so it is protected the same way as activity execution by the
-            # same "try" block.
-            transaction.commit()
-          else:
-            # This message failed, abort.
-            transaction.abort()
-        except:
-          value = m.uid, m.object_path, m.method_id
-          LOG('SQLQueue', WARNING, 'Exception raised when invoking message (uid, path, method_id) %r' % (value, ), error=sys.exc_info())
-          try:
-            transaction.abort()
-          except:
-            # Unfortunately, database adapters may raise an exception against abort.
-            LOG('SQLQueue', PANIC, 'abort failed, thus some objects may be modified accidentally')
-            raise
-          # We must make sure that the message is not set as executed.
-          # It is possible that the message is executed but the commit
-          # of the transaction fails
-          m.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
-          # XXX Is it still useful to free message now that this node is able
-          #     to reselect it ?
-          try:
-            makeMessageListAvailable([m.uid])
-          except:
-            LOG('SQLQueue', ERROR, 'Failed to free message: %r' % (value, ), error=sys.exc_info())
-          else:
-            LOG('SQLQueue', TRACE, 'Freed message %r' % (value, ))
-        if time() > processing_stop_time:
-          LOG('SQLQueue', TRACE, 'Stop processing message batch because processing delay exceeded')
-          break
-      # Release all unprocessed messages
-      to_free_uid_list = [m.uid for m in message_list[processed_count:]]
-      if to_free_uid_list:
-        try:
-          makeMessageListAvailable(to_free_uid_list)
-        except:
-          LOG('SQLQueue', ERROR, 'Failed to free remaining messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
-        else:
-          LOG('SQLQueue', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
-      self.finalizeMessageExecution(activity_tool,
-                                    message_list[:processed_count])
-    transaction.commit()
-    return not message_list
+    return ()
 
+  dequeueMessage = SQLBase.dequeueMessage
 
   def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
     hasMessage = getattr(activity_tool, 'SQLQueue_hasMessage', None)

Modified: erp5/trunk/products/CMFActivity/ActivityTool.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/ActivityTool.py?rev=37686&r1=37685&r2=37686&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/ActivityTool.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/ActivityTool.py [utf8] Wed Aug 11 10:35:15 2010
@@ -1172,7 +1172,7 @@ class ActivityTool (Folder, UniqueObject
         for held in my_self.REQUEST._held:
           self.REQUEST._hold(held)
 
-    def invokeGroup(self, method_id, message_list):
+    def invokeGroup(self, method_id, message_list, activity, merge_duplicate):
       if self.activity_tracking:
         activity_tracking_logger.info(
           'invoking group messages: method_id=%s, paths=%s'
@@ -1202,20 +1202,22 @@ class ActivityTool (Folder, UniqueObject
           else:
             subobject_list = (obj,)
           for subobj in subobject_list:
-            path = subobj.getPath()
-            if path not in path_set:
+            if merge_duplicate:
+              path = subobj.getPath()
+              if path in path_set:
+                continue
               path_set.add(path)
-              if alternate_method_id is not None \
-                 and hasattr(aq_base(subobj), alternate_method_id):
-                # if this object is alternated,
-                # generate a new single active object
-                activity_kw = m.activity_kw.copy()
-                activity_kw.pop('group_method_id', None)
-                activity_kw.pop('group_id', None)
-                active_obj = subobj.activate(**activity_kw)
-                getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
-              else:
-                expanded_object_list.append((subobj, m.args, m.kw))
+            if alternate_method_id is not None \
+               and hasattr(aq_base(subobj), alternate_method_id):
+              # if this object is alternated,
+              # generate a new single active object
+              activity_kw = m.activity_kw.copy()
+              activity_kw.pop('group_method_id', None)
+              activity_kw.pop('group_id', None)
+              active_obj = subobj.activate(activity=activity, **activity_kw)
+              getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
+            else:
+              expanded_object_list.append((subobj, m.args, m.kw))
           new_message_list.append((m, obj))
         except:
           m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)

Copied: erp5/trunk/products/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql (from r37685, erp5/trunk/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql)
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql?p2=erp5/trunk/products/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql&p1=erp5/trunk/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql&r1=37685&r2=37686&rev=37686&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql [utf8] Wed Aug 11 10:35:15 2010
@@ -7,15 +7,14 @@ cache_time:0
 class_name:
 class_file:
 </dtml-comment>
-<params>uid_list</params>
+<params>table
+uid</params>
 UPDATE
-  message
+  <dtml-var table>
 SET
   processing_node=0,
   processing=0
 WHERE
-  uid IN (
-  <dtml-in prefix="uid" expr="uid_list"><dtml-sqlvar uid_item type="int"><dtml-if sequence-end><dtml-else>, </dtml-if></dtml-in>
-  )
+  <dtml-sqltest uid type="int" multiple>
 <dtml-var sql_delimiter>
 COMMIT

Copied: erp5/trunk/products/CMFActivity/skins/activity/SQLBase_processMessage.zsql (from r37685, erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql)
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLBase_processMessage.zsql?p2=erp5/trunk/products/CMFActivity/skins/activity/SQLBase_processMessage.zsql&p1=erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql&r1=37685&r2=37686&rev=37686&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLBase_processMessage.zsql [utf8] Wed Aug 11 10:35:15 2010
@@ -7,14 +7,14 @@ cache_time:0
 class_name:
 class_file:
 </dtml-comment>
-<params>uid</params>
-UPDATE message
+<params>table
+uid</params>
+UPDATE
+  <dtml-var table>
 SET
   processing_date = UTC_TIMESTAMP(),
   processing = 1
 WHERE
-  uid IN (
-<dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
-  )
+  <dtml-sqltest uid type="int" multiple>
 <dtml-var sql_delimiter>
 COMMIT

Copied: erp5/trunk/products/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql (from r37685, erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql)
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql?p2=erp5/trunk/products/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql&p1=erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql&r1=37685&r2=37686&rev=37686&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql [utf8] Wed Aug 11 10:35:15 2010
@@ -7,19 +7,22 @@ cache_time:0
 class_name:
 class_file:
 </dtml-comment>
-<params>processing_node
+<params>table
+processing_node
 to_date
 count
 group_method_id
 </params>
 UPDATE
-  message
+  <dtml-var table>
 SET
   processing_node=<dtml-sqlvar processing_node type="int">
 WHERE
   processing_node=0
   AND date <= <dtml-sqlvar to_date type="datetime">
-  <dtml-if expr="group_method_id is not None"> AND group_method_id = <dtml-sqlvar group_method_id type="string"> </dtml-if>
+  <dtml-if expr="group_method_id is not None">
+    AND group_method_id = <dtml-sqlvar group_method_id type="string">
+  </dtml-if>
 ORDER BY
 <dtml-comment>
   Explanation of the order by:

Copied: erp5/trunk/products/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql (from r37685, erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql)
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql?p2=erp5/trunk/products/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql&p1=erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql&r1=37685&r2=37686&rev=37686&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql [utf8] Wed Aug 11 10:35:15 2010
@@ -7,12 +7,13 @@ cache_time:0
 class_name:
 class_file:
 </dtml-comment>
-<params>processing_node
+<params>table
+processing_node
 count</params>
 SELECT
   *
 FROM
-  message_queue
+  <dtml-var table>
 WHERE
   processing_node = <dtml-sqlvar processing_node type="int">
 <dtml-if expr="count is not None">

Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql?rev=37685&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql (removed)
@@ -1,21 +0,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:0
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>uid_list</params>
-UPDATE
-  message
-SET
-  processing_node=0,
-  processing=0
-WHERE
-  uid IN (
-  <dtml-in prefix="uid" expr="uid_list"><dtml-sqlvar uid_item type="int"><dtml-if sequence-end><dtml-else>, </dtml-if></dtml-in>
-  )
-<dtml-var sql_delimiter>
-COMMIT

Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql?rev=37685&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql (removed)
@@ -1,20 +0,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:1
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>uid</params>
-UPDATE message
-SET
-  processing_date = UTC_TIMESTAMP(),
-  processing = 1
-WHERE
-  uid IN (
-<dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
-  )
-<dtml-var sql_delimiter>
-COMMIT

Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql?rev=37685&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql (removed)
@@ -1,36 +0,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:0
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>processing_node
-to_date
-count
-group_method_id
-</params>
-UPDATE
-  message
-SET
-  processing_node=<dtml-sqlvar processing_node type="int">
-WHERE
-  processing_node=0
-  AND date <= <dtml-sqlvar to_date type="datetime">
-  <dtml-if expr="group_method_id is not None"> AND group_method_id = <dtml-sqlvar group_method_id type="string"> </dtml-if>
-ORDER BY
-<dtml-comment>
-  Explanation of the order by:
-  - priority must be respected (it is a feature)
-  - when multiple nodes simultaneously try to fetch activities, they should not
-    be given the same set of lines as it would cause all minus one to wait for
-    a write lock (and be ultimately aborted), effectively serializing their
-    action (so breaking paralellism).
-    So we must force MySQL to update lines in a random order.
-</dtml-comment>
-  priority, RAND()
-LIMIT <dtml-sqlvar count type="int">
-<dtml-var sql_delimiter>
-COMMIT

Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql?rev=37685&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql (removed)
@@ -1,20 +0,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:0
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>processing_node
-count</params>
-SELECT
-  *
-FROM
-  message
-WHERE
-  processing_node = <dtml-sqlvar processing_node type="int">
-<dtml-if expr="count is not None">
-  LIMIT <dtml-sqlvar count type="int">
-</dtml-if>

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql?rev=37686&r1=37685&r2=37686&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql [utf8] Wed Aug 11 10:35:15 2010
@@ -18,6 +18,7 @@ CREATE TABLE `message_queue` (
   `processing` TINYINT NOT NULL DEFAULT 0,
   `processing_date` DATETIME,
   `priority` TINYINT NOT NULL DEFAULT 0,
+  `group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
   `tag` VARCHAR(255) NOT NULL,
   `serialization_tag` VARCHAR(255) NOT NULL,
   `retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,

Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql?rev=37685&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql (removed)
@@ -1,21 +0,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:0
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>uid_list</params>
-UPDATE
-  message_queue
-SET
-  processing_node=0,
-  processing=0
-WHERE
-  uid IN (
-  <dtml-in prefix="uid" expr="uid_list"><dtml-sqlvar uid_item type="int"><dtml-if sequence-end><dtml-else>, </dtml-if></dtml-in>
-  )
-<dtml-var sql_delimiter>
-COMMIT

Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql?rev=37685&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql (removed)
@@ -1,34 +0,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:0
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>processing_node
-to_date
-count
-</params>
-UPDATE
-  message_queue
-SET
-  processing_node=<dtml-sqlvar processing_node type="int">
-WHERE
-  processing_node=0
-  AND date <= <dtml-sqlvar to_date type="datetime">
-ORDER BY
-<dtml-comment>
-  Explanation of the order by:
-  - priority must be respected (it is a feature)
-  - when multiple nodes simultaneously try to fetch activities, they should not
-    be given the same set of lines as it would cause all minus one to wait for
-    a write lock (and be ultimately aborted), effectively serializing their
-    action (so breaking paralellism).
-    So we must force MySQL to update lines in a random order.
-</dtml-comment>
-  priority, RAND()
-LIMIT <dtml-sqlvar count type="int">
-<dtml-var sql_delimiter>
-COMMIT

Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql?rev=37685&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql (removed)
@@ -1,20 +0,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:0
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>processing_node
-count</params>
-SELECT
-  *
-FROM
-  message_queue
-WHERE
-  processing_node = <dtml-sqlvar processing_node type="int">
-<dtml-if expr="count is not None">
-  LIMIT <dtml-sqlvar count type="int">
-</dtml-if>

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql?rev=37686&r1=37685&r2=37686&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_writeMessageList.zsql [utf8] Wed Aug 11 10:35:15 2010
@@ -15,11 +15,12 @@ message_list
 priority_list
 processing_node_list
 date_list
+group_method_id_list
 tag_list
 serialization_tag_list
 </params>
 INSERT INTO message_queue
-(uid, path, active_process_uid, date, method_id, processing_node, processing, priority, tag, serialization_tag, message)
+(uid, path, active_process_uid, date, method_id, processing_node, processing, priority, group_method_id, tag, serialization_tag, message)
 VALUES
 <dtml-in prefix="loop" expr="_.range(_.len(path_list))">
 <dtml-if sequence-start><dtml-else>,</dtml-if>
@@ -32,6 +33,7 @@ VALUES
   <dtml-if expr="processing_node_list is not None"><dtml-sqlvar expr="processing_node_list[loop_item]" type="int"><dtml-else>-1</dtml-if>,
   0,
   <dtml-sqlvar expr="priority_list[loop_item]" type="int">,
+  <dtml-sqlvar expr="group_method_id_list[loop_item]" type="string">,
   <dtml-sqlvar expr="tag_list[loop_item]" type="string">,
   <dtml-sqlvar expr="serialization_tag_list[loop_item]" type="string">,
   <dtml-sqlvar expr="message_list[loop_item]" type="string">

Modified: erp5/trunk/products/CMFActivity/tests/testCMFActivity.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/tests/testCMFActivity.py?rev=37686&r1=37685&r2=37686&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/tests/testCMFActivity.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/tests/testCMFActivity.py [utf8] Wed Aug 11 10:35:15 2010
@@ -430,7 +430,7 @@ class TestCMFActivity(ERP5TypeTestCase, 
   def TryActiveProcessInsideActivity(self, activity):
     """
     Try two levels with active_process, we create one first
-    activity with an acitive process, then this new activity
+    activity with an active process, then this new activity
     uses another active process
     """
     portal = self.getPortal()
@@ -1872,27 +1872,26 @@ class TestCMFActivity(ERP5TypeTestCase, 
     getattr(organisation, 'uid')
 
 
-  def test_80_CallWithGroupIdParamater(self, quiet=0, run=run_all_test):
-    """
-    Test that group_id parameter is used to separate execution of the same method
-    """
+  def callWithGroupIdParamater(self, activity, quiet, run):
     if not run: return
     if not quiet:
-      message = '\nTest Activity with group_id parameter'
+      message = '\nTest Activity with group_id parameter (%s)' % activity
       ZopeTestCase._print(message)
       LOG('Testing... ',0,message)
 
     portal = self.getPortal()    
     organisation =  portal.organisation._getOb(self.company_id)
     # Defined a group method
+    foobar_list = []
     def setFoobar(self, object_list):
+      foobar_list.append(len(object_list))
       for obj, args, kw in object_list:
         number = kw.get('number', 1)
         if getattr(obj,'foobar', None) is not None:
           obj.foobar = obj.foobar + number
         else:
           obj.foobar = number
-      object_list[:] = []
+      del object_list[:]
     from Products.ERP5Type.Document.Folder import Folder
     Folder.setFoobar = setFoobar    
 
@@ -1905,46 +1904,65 @@ class TestCMFActivity(ERP5TypeTestCase, 
 
     # Test group_method_id is working without group_id
     for x in xrange(5):
-      organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar").reindexObject(number=1)
+      organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar").reindexObject(number=1)
       transaction.commit()      
 
     message_list = portal.portal_activities.getMessageList()
     self.assertEquals(len(message_list),5)
     portal.portal_activities.distribute()
     portal.portal_activities.tic()
-    self.assertEquals(1, organisation.getFoobar())
+    expected = dict(SQLDict=1, SQLQueue=5)[activity]
+    self.assertEquals(expected, organisation.getFoobar())
 
 
     # Test group_method_id is working with one group_id defined
     for x in xrange(5):
-      organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
+      organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
       transaction.commit()      
 
     message_list = portal.portal_activities.getMessageList()
     self.assertEquals(len(message_list),5)
     portal.portal_activities.distribute()
     portal.portal_activities.tic()
-    self.assertEquals(2, organisation.getFoobar())
+    self.assertEquals(expected * 2, organisation.getFoobar())
+
+    self.assertEquals([expected, expected], foobar_list)
+    del foobar_list[:]
 
     # Test group_method_id is working with many group_id defined
     for x in xrange(5):
-      organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
+      organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
       transaction.commit()      
-      organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="2").reindexObject(number=3)
+      organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="2").reindexObject(number=3)
       transaction.commit()
-      organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
+      organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
       transaction.commit()
-      organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="3").reindexObject(number=5)
+      organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="3").reindexObject(number=5)
       transaction.commit()
 
     message_list = portal.portal_activities.getMessageList()
     self.assertEquals(len(message_list),20)
     portal.portal_activities.distribute()
     portal.portal_activities.tic()
-    self.assertEquals(11, organisation.getFoobar())
+    self.assertEquals(dict(SQLDict=11, SQLQueue=60)[activity],
+                      organisation.getFoobar())
+    self.assertEquals(dict(SQLDict=[1, 1, 1], SQLQueue=[5, 5, 10])[activity],
+                      sorted(foobar_list))
     message_list = portal.portal_activities.getMessageList()
     self.assertEquals(len(message_list), 0)
-    
+
+  def test_80a_CallWithGroupIdParamaterSQLDict(self, quiet=0, run=run_all_test):
+    """
+    Test that group_id parameter is used to separate execution of the same method
+    """
+    self.callWithGroupIdParamater('SQLDict', quiet=quiet, run=run)
+
+  def test_80b_CallWithGroupIdParamaterSQLQueue(self, quiet=0,
+                                                run=run_all_test):
+    """
+    Test that group_id parameter is used to separate execution of the same method
+    """
+    self.callWithGroupIdParamater('SQLQueue', quiet=quiet, run=run)
 
   def test_81_ActivateKwForWorkflowTransition(self, quiet=0, run=run_all_test):
     """
@@ -3148,7 +3166,7 @@ class TestCMFActivity(ERP5TypeTestCase, 
     self.assertEqual(len(message_list), 1)
     message = message_list[0]
     portal.organisation_module._delOb(organisation.id)
-    activity_tool.invokeGroup('getTitle', [message])
+    activity_tool.invokeGroup('getTitle', [message], 'SQLDict', True)
     checkMessage(message, KeyError)
     activity_tool.manageCancel(message.object_path, message.method_id)
     # 2: activity method does not exist when activity is executed
@@ -3157,7 +3175,8 @@ class TestCMFActivity(ERP5TypeTestCase, 
     message_list = activity_tool.getMessageList()
     self.assertEqual(len(message_list), 1)
     message = message_list[0]
-    activity_tool.invokeGroup('this_method_does_not_exist', [message])
+    activity_tool.invokeGroup('this_method_does_not_exist',
+                              [message], 'SQLDict', True)
     checkMessage(message, KeyError)
     activity_tool.manageCancel(message.object_path, message.method_id)
 
@@ -3739,7 +3758,6 @@ class TestCMFActivity(ERP5TypeTestCase, 
       LOG('Testing... ',0,message)
     self.TryNotificationSavedOnEventLogWhenSiteErrorLoggerRaises('SQLDict')
 
-  @expectedFailure
   def test_123_userNotificationSavedOnEventLogWhenSiteErrorLoggerRaisesWithSQLQueue(self, quiet=0, run=run_all_test):
     if not run: return
     if not quiet:




More information about the Erp5-report mailing list