[Erp5-report] r14039 - in /erp5/trunk/products/CMFActivity: ./ Activity/ skins/activity/

nobody at svn.erp5.org nobody at svn.erp5.org
Wed Apr 11 03:22:01 CEST 2007


Author: yo
Date: Wed Apr 11 03:21:56 2007
New Revision: 14039

URL: http://svn.erp5.org?rev=14039&view=rev
Log:
This big change optimizes the scheduling of active objects,
and fix some bugs.

The basic idea is to track a dependency graph to find executable
messages quickly. This makes the activity system far more efficient,
when you have many inter-dependent messages queued in the tables.

Also, this obsoletes the time shifting in the schedulers,
as executable messages can be found in a more efficient manner.
So the activity parameter "at_date" should work expectedly.

Now the API of validate methods in Activities return a
list of message objects instead of a boolean value. Such
a list contains messages that are depended upon by a given
message.

The validate method in Message accepts a new optional
parameter, check_order_validation, to indicate whether
order validation should be performed. The default behavior
has not changed.

getDependentMessageList is added to ActivityTool, Queue
and Message. This method collects dependent message for
a given message from all activities.

There are some other subtle changes. Look at the diffs for
more details.

Modified:
    erp5/trunk/products/CMFActivity/Activity/Queue.py
    erp5/trunk/products/CMFActivity/Activity/RAMDict.py
    erp5/trunk/products/CMFActivity/Activity/RAMQueue.py
    erp5/trunk/products/CMFActivity/Activity/SQLDict.py
    erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
    erp5/trunk/products/CMFActivity/ActivityTool.py
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_clearProcessingFlag.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readMessageList.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readUidList.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_validateMessageList.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_clearProcessingFlag.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessage.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessageList.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readUidList.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_setPriority.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_validateMessageList.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_writeMessage.zsql

Modified: erp5/trunk/products/CMFActivity/Activity/Queue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/Queue.py?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/Queue.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/Queue.py Wed Apr 11 03:21:56 2007
@@ -26,12 +26,17 @@
 #
 ##############################################################################
 
-import pickle, sys
-from Acquisition import aq_base
+import cPickle, sys
 from DateTime import DateTime
-from Products.CMFActivity.ActivityTool import Message
-from zLOG import LOG
+from zLOG import LOG, WARNING, ERROR
 from ZODB.POSException import ConflictError
+import sha
+from cStringIO import StringIO
+
+try:
+  from transaction import get as get_transaction
+except ImportError:
+  pass
 
 # Error values for message validation
 EXCEPTION      = -1
@@ -93,14 +98,14 @@
 
   def queueMessage(self, activity_tool, m):    
     activity_tool.deferredQueueMessage(self, m)  
-  
+
   def deleteMessage(self, activity_tool, m):
     if not getattr(m, 'is_deleted', 0):
       # We try not to delete twice
       # However this can not be garanteed in the case of messages loaded from SQL
       activity_tool.deferredDeleteMessage(self, m)  
     m.is_deleted = 1
-    
+
   def dequeueMessage(self, activity_tool, processing_node):
     pass
 
@@ -122,45 +127,102 @@
     self.is_awake[processing_node] = 0
     self.is_alive[processing_node] = 0
 
-  def validate(self, activity_tool, message, **kw):
+  def validate(self, activity_tool, message, check_order_validation=1, **kw):
     """
       This is the place where activity semantics is implemented
       **kw contains all parameters which allow to implement synchronisation,
       constraints, delays, etc.
-      
+
       Standard synchronisation parameters:
-      
+
       after_method_id   --  never validate message if after_method_id
                             is in the list of methods which are
                             going to be executed
-    
+
       after_message_uid --  never validate message if after_message_uid
                             is in the list of messages which are
                             going to be executed
-    
+
       after_path        --  never validate message if after_path
                             is in the list of path which are
-                            going to be executed                                                        
+                            going to be executed
     """
     try:
       if activity_tool.unrestrictedTraverse(message.object_path, None) is None:
         # Do not try to call methods on objects which do not exist
-        LOG('WARNING ActivityTool', 0,
+        LOG('CMFActivity', WARNING,
            'Object %s does not exist' % '/'.join(message.object_path))
         return INVALID_PATH
-      for k, v in kw.items():
-        if activity_tool.validateOrder(message, k, v):
-          return INVALID_ORDER
+      if check_order_validation:
+        for k, v in kw.iteritems():
+          if activity_tool.validateOrder(message, k, v):
+            return INVALID_ORDER
     except ConflictError:
       raise
     except:
-      LOG('WARNING ActivityTool', 0,
+      LOG('CMFActivity', WARNING,
           'Validation of Object %s raised exception' % '/'.join(message.object_path),
           error=sys.exc_info())
       # Do not try to call methods on objects which cause errors
       return EXCEPTION
     return VALID
 
+  def getDependentMessageList(self, activity_tool, message, **kw):
+    message_list = []
+    for k, v in kw.iteritems():
+      result = activity_tool.getDependentMessageList(message, k, v)
+      if result:
+        message_list.extend(result)
+    return message_list
+
+  def getExecutableMessageList(self, activity_tool, message, message_dict,
+                               validation_text_dict):
+    """Get messages which have no dependent message, and store them in the dictionary.
+
+    If the passed message itself is executable, simply store only that message.
+    Otherwise, try to find at least one message executable from dependent messages.
+
+    This may result in no new message, if all dependent messages are already present
+    in the dictionary, if all dependent messages are in different activities, or if
+    the message has a circular dependency.
+
+    The validation text dictionary is used only to cache the results of validations,
+    in order to reduce the number of SQL queries.
+    """
+    if message.uid in message_dict:
+      # Nothing to do. But detect a circular dependency.
+      if message_dict[message.uid] is None:
+        LOG('CMFActivity', ERROR,
+            'message uid %r has a circular dependency' % (message.uid,))
+      return
+
+    cached_result = validation_text_dict.get(message.order_validation_text)
+    if cached_result is None:
+      message_list = message.getDependentMessageList(self, activity_tool)
+      get_transaction().commit() # Release locks.
+      if message_list:
+        # The result is not empty, so this message is not executable.
+        validation_text_dict[message.order_validation_text] = 0
+        now_date = DateTime()
+        for activity, m in message_list:
+          # Note that the messages may contain ones which are already assigned or not
+          # executable yet.
+          if activity is self and m.processing_node == -1 and m.date <= now_date:
+            # Call recursively. Set None as a marker to detect a circular dependency.
+            message_dict[message.uid] = None
+            try:
+              self.getExecutableMessageList(activity_tool, m, message_dict,
+                                             validation_text_dict)
+            finally:
+              del message_dict[message.uid]
+      else:
+        validation_text_dict[message.order_validation_text] = 1
+        message_dict[message.uid] = message
+    elif cached_result:
+      message_dict[message.uid] = message
+    else:
+      pass
+
   def isAwake(self, activity_tool, processing_node):
     return self.is_awake[processing_node]
 
@@ -179,22 +241,40 @@
     pass
 
   def loadMessage(self, s, **kw):
-    m = pickle.loads(s)
+    m = cPickle.load(StringIO(s))
     m.__dict__.update(kw)
     return m
 
   def dumpMessage(self, m):
-    return pickle.dumps(m)
+    return cPickle.dumps(m)
+
+  def getOrderValidationText(self, message):
+    # Return an identifier of validators related to ordering.
+    order_validation_item_list = []
+    key_list = message.activity_kw.keys()
+    key_list.sort()
+    for key in key_list:
+      method_id = "_validate_%s" % key
+      if hasattr(self, method_id):
+        order_validation_item_list.append((key, message.activity_kw[key]))
+    if len(order_validation_item_list) == 0:
+      # When no order validation argument is specified, skip the computation
+      # of the checksum for speed. Here, 'none' is used, because this never be
+      # identical to SHA1 hexdigest (which is always 40 characters), and 'none'
+      # is true in Python. This is important, because dtml-if assumes that an empty
+      # string is false, so we must use a non-empty string for this.
+      return 'none'
+    return sha.new(repr(order_validation_item_list)).hexdigest()
 
   def getMessageList(self, activity_tool, processing_node=None,**kw):
-    return []  
+    return []
 
   def countMessage(self, activity_tool,**kw):
     return 0
 
   def countMessageWithTag(self, activity_tool,value):
     return 0
-  
+
   # Transaction Management
   def prepareQueueMessage(self, activity_tool, m):
     # Called to prepare transaction commit for queued messages

Modified: erp5/trunk/products/CMFActivity/Activity/RAMDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/RAMDict.py?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/RAMDict.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/RAMDict.py Wed Apr 11 03:21:56 2007
@@ -27,8 +27,8 @@
 ##############################################################################
 
 from Products.CMFActivity.ActivityTool import registerActivity
+from Products.CMFActivity.Errors import ActivityFlushError
 from Queue import Queue, VALID
-from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
 
 from zLOG import LOG
 

Modified: erp5/trunk/products/CMFActivity/Activity/RAMQueue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/RAMQueue.py?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/RAMQueue.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/RAMQueue.py Wed Apr 11 03:21:56 2007
@@ -28,7 +28,6 @@
 
 from Products.CMFActivity.ActivityTool import registerActivity
 from Queue import Queue, VALID
-from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
 
 try:
   from transaction import get as get_transaction

Modified: erp5/trunk/products/CMFActivity/Activity/SQLDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLDict.py?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLDict.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLDict.py Wed Apr 11 03:21:56 2007
@@ -26,17 +26,15 @@
 #
 ##############################################################################
 
-import random
 from DateTime import DateTime
 from Products.CMFActivity.ActivityTool import registerActivity
-from Queue import VALID, INVALID_ORDER, INVALID_PATH, EXCEPTION, MAX_PROCESSING_TIME, VALIDATION_ERROR_DELAY
+from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY
 from RAMDict import RAMDict
-from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
+from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
 from Products.CMFActivity.Errors import ActivityFlushError
 from ZODB.POSException import ConflictError
 import sys
-import sha
-from types import ClassType, StringType, ListType, TupleType
+from types import ClassType
 
 try:
   from transaction import get as get_transaction
@@ -141,50 +139,22 @@
     message_list = activity_buffer.getMessageList(self)
     return [m for m in message_list if m.is_registered]
 
-  def getOrderValidationText(self, message):
-    # Return an identifier of validators related to ordering.
-    order_validation_item_list = []
-    key_list = message.activity_kw.keys()
-    key_list.sort()
-    for key in key_list:
-      method_id = "_validate_%s" % key
-      if hasattr(self, method_id):
-        order_validation_item_list.append((key, message.activity_kw[key]))
-    if len(order_validation_item_list) == 0:
-      # When no order validation argument is specified, skip the computation
-      # of the checksum for speed. Here, 'none' is used, because this never be
-      # identical to SHA1 hexdigest (which is always 40 characters), and 'none'
-      # is true in Python. This is important, because dtml-if assumes that an empty
-      # string is false, so we must use a non-empty string for this.
-      return 'none'
-    return sha.new(repr(order_validation_item_list)).hexdigest()
-
   def validateMessage(self, activity_tool, message, uid_list, priority, processing_node):
-    validation_state = message.validate(self, activity_tool)
+    validation_state = message.validate(self, activity_tool, check_order_validation=0)
     if validation_state is not VALID:
-      if validation_state in (EXCEPTION, INVALID_PATH):
-        # There is a serious validation error - we must lower priority
-        if priority > MAX_PRIORITY:
-          # This is an error
-          if len(uid_list) > 0:
-            #LOG('SQLDict', 0, 'error uid_list = %r' % (uid_list,))
-            activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = VALIDATE_ERROR_STATE)
-                                                                            # Assign message back to 'error' state
-          #m.notifyUser(activity_tool)                                      # Notify Error
-          get_transaction().commit()                                        # and commit
-        else:
-          #LOG('SQLDict', 0, 'lower priority uid_list = %r' % (uid_list,))
-          # Lower priority
-          if len(uid_list) > 0: # Add some delay before new processing
-            activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
-                                              priority = priority + 1, retry = 1)
-          get_transaction().commit() # Release locks before starting a potentially long calculation
+      # There is a serious validation error - we must lower priority
+      if priority > MAX_PRIORITY:
+        # This is an error
+        if len(uid_list) > 0:
+          activity_tool.SQLDict_assignMessage(uid=uid_list, processing_node=VALIDATE_ERROR_STATE)
+                                                                          # Assign message back to 'error' state
+        #m.notifyUser(activity_tool)                                      # Notify Error
+        get_transaction().commit()                                        # and commit
       else:
-        # We do not lower priority for INVALID_ORDER errors but we do postpone execution
-        #order_validation_text = self.getOrderValidationText(message)
-        activity_tool.SQLDict_setPriority(uid = uid_list,
-                                          delay = VALIDATION_ERROR_DELAY,
-                                          retry = 1)
+        # Lower priority
+        if len(uid_list) > 0: # Add some delay before new processing
+          activity_tool.SQLDict_setPriority(uid=uid_list, delay=VALIDATION_ERROR_DELAY,
+                                            priority=priority + 1, retry=1)
         get_transaction().commit() # Release locks before starting a potentially long calculation
       return 0
     return 1
@@ -196,41 +166,29 @@
       return 1
 
     now_date = DateTime()
-    priority = random.choice(priority_weight)
-    # Try to find a message at given priority level which is scheduled for now
-    result = readMessage(processing_node=processing_node, priority=priority,
-                         to_date=now_date)
-    if len(result) == 0:
-      # If empty, take any message which is scheduled for now
-      priority = None
-      result = readMessage(processing_node=processing_node, priority=priority, to_date=now_date)
-    if len(result) == 0:
-      # If the result is still empty, shift the dates so that SQLDict can dispatch pending active
-      # objects quickly.
-      self.timeShift(activity_tool, VALIDATION_ERROR_DELAY, processing_node,retry=1)
-    else:
-      #LOG('SQLDict dequeueMessage', 100, 'result = %r' % (list(result)))
+    result = readMessage(processing_node=processing_node, to_date=now_date)
+    if len(result) > 0:
       line = result[0]
       path = line.path
       method_id = line.method_id
       order_validation_text = line.order_validation_text
-      uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
-                                                   processing_node = None, to_date = now_date,
-                                                   order_validation_text = order_validation_text)
+      uid_list = activity_tool.SQLDict_readUidList(path=path, method_id=method_id,
+                                                   processing_node=None, to_date=now_date,
+                                                   order_validation_text=order_validation_text)
       uid_list = [x.uid for x in uid_list]
       uid_list_list = [uid_list]
       priority_list = [line.priority]
       # Make sure message can not be processed anylonger
       if len(uid_list) > 0:
         # Set selected messages to processing
-        activity_tool.SQLDict_processMessage(uid = uid_list)
+        activity_tool.SQLDict_processMessage(uid=uid_list)
       get_transaction().commit() # Release locks before starting a potentially long calculation
       # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state
 
       # At this point, messages are marked as processed. So catch any kind of exception to make sure
       # that they are unmarked on error.
       try:
-        m = self.loadMessage(line.message, uid = line.uid)
+        m = self.loadMessage(line.message, uid=line.uid)
         message_list = [m]
         # Validate message (make sure object exists, priority OK, etc.)
         if not self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
@@ -244,13 +202,13 @@
           else:
             count = 1
 
-          group_method = activity_tool.restrictedTraverse(group_method_id)
+          group_method = activity_tool.getPortalObject().restrictedTraverse(group_method_id)
 
           if count < MAX_GROUPED_OBJECTS:
             # Retrieve objects which have the same group method.
-            result = readMessage(processing_node = processing_node, priority = priority,
-                                to_date = now_date, group_method_id = group_method_id,
-                                order_validation_text = order_validation_text)
+            result = readMessage(processing_node=processing_node,
+                                 to_date=now_date, group_method_id=group_method_id,
+                                 order_validation_text=order_validation_text)
             #LOG('SQLDict dequeueMessage', 0, 'result = %d' % (len(result)))
             path_and_method_id_dict = {}
             for line in result:
@@ -263,19 +221,20 @@
                 continue
               path_and_method_id_dict[key] = 1
 
-              uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
-                                                            processing_node = None, to_date = now_date,
-                                                            order_validation_text = order_validation_text)
+              uid_list = activity_tool.SQLDict_readUidList(path=path, method_id=method_id,
+                                                           processing_node=None,
+                                                           to_date=now_date,
+                                                           order_validation_text=order_validation_text)
               uid_list = [x.uid for x in uid_list]
               if len(uid_list) > 0:
                 # Set selected messages to processing
-                activity_tool.SQLDict_processMessage(uid = uid_list)
+                activity_tool.SQLDict_processMessage(uid=uid_list)
               get_transaction().commit() # Release locks before starting a potentially long calculation
 
               # Save this newly marked uids as soon as possible.
               uid_list_list.append(uid_list)
 
-              m = self.loadMessage(line.message, uid = line.uid)
+              m = self.loadMessage(line.message, uid=line.uid)
               if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
                 if m.hasExpandMethod():
                   count += len(m.getObjectList(activity_tool))
@@ -321,7 +280,8 @@
           get_transaction().abort()
         except:
           # Unfortunately, database adapters may raise an exception against abort.
-          LOG('SQLDict', WARNING, 'abort failed, thus some objects may be modified accidentally')
+          LOG('SQLDict', WARNING,
+              'abort failed, thus some objects may be modified accidentally')
           pass
 
         # An exception happens at somewhere else but invoke or invokeGroup, so messages
@@ -330,10 +290,11 @@
           for uid_list in uid_list_list:
             if len(uid_list):
               # This only sets processing to zero.
-              activity_tool.SQLDict_setPriority(uid = uid_list)
+              activity_tool.SQLDict_setPriority(uid=uid_list)
               get_transaction().commit()
         except:
-          LOG('SQLDict', ERROR, 'SQLDict.dequeueMessage raised, and cannot even set processing to zero due to an exception',
+          LOG('SQLDict', ERROR,
+              'SQLDict.dequeueMessage raised, and cannot even set processing to zero due to an exception',
               error=sys.exc_info())
           raise
         return 0
@@ -345,8 +306,8 @@
           priority = priority_list[i]
           if m.is_executed:
             if len(uid_list) > 0:
-              activity_tool.SQLDict_delMessage(uid = uid_list)                # Delete it
-            get_transaction().commit()                                        # If successful, commit
+              activity_tool.SQLDict_delMessage(uid=uid_list)       # Delete it
+            get_transaction().commit()                             # If successful, commit
             if m.active_process:
               active_process = activity_tool.unrestrictedTraverse(m.active_process)
               if not active_process.hasActivity():
@@ -355,24 +316,25 @@
           else:
             if type(m.exc_type) is ClassType and issubclass(m.exc_type, ConflictError):
               # If this is a conflict error, do not lower the priority but only delay.
-              activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
-                                                retry = 1)
+              activity_tool.SQLDict_setPriority(uid=uid_list, delay=VALIDATION_ERROR_DELAY)
               get_transaction().commit() # Release locks before starting a potentially long calculation
             elif priority > MAX_PRIORITY:
               # This is an error
               if len(uid_list) > 0:
-                activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE)
+                activity_tool.SQLDict_assignMessage(uid=uid_list,
+                                                    processing_node=INVOKE_ERROR_STATE)
                                                                                 # Assign message back to 'error' state
               m.notifyUser(activity_tool)                                       # Notify Error
               get_transaction().commit()                                        # and commit
             else:
               # Lower priority
               if len(uid_list) > 0:
-                activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
-                                                  priority = priority + 1, retry = 1)
-                get_transaction().commit() # Release locks before starting a potentially long calculation
+                activity_tool.SQLDict_setPriority(uid=uid_list, delay=VALIDATION_ERROR_DELAY,
+                                                  priority=priority + 1)
+              get_transaction().commit() # Release locks before starting a potentially long calculation
       except:
-        LOG('SQLDict', ERROR, 'SQLDict.dequeueMessage raised an exception during checking for the results of processed messages',
+        LOG('SQLDict', ERROR,
+            'SQLDict.dequeueMessage raised an exception during checking for the results of processed messages',
             error=sys.exc_info())
         raise
 
@@ -411,7 +373,7 @@
     if readMessageList is not None:
       # Parse each message in registered
       for m in activity_tool.getRegisteredMessageList(self):
-        if list(m.object_path) == list(object_path) and (method_id is None or method_id == m.method_id):
+        if m.object_path == object_path and (method_id is None or method_id == m.method_id):
           #if not method_dict.has_key(method_id or m.method_id):
           if not method_dict.has_key(m.method_id):
             method_dict[m.method_id] = 1 # Prevents calling invoke twice
@@ -469,13 +431,13 @@
         if len(uid_list)>0:
           activity_tool.SQLDict_delMessage(uid = [x.uid for x in uid_list])
 
-  def getMessageList(self, activity_tool, processing_node=None,include_processing=0,**kw):
+  def getMessageList(self, activity_tool, processing_node=None, include_processing=0, **kw):
     # YO: reading all lines might cause a deadlock
     message_list = []
     readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
     if readMessageList is not None:
       result = readMessageList(path=None, method_id=None, processing_node=None,
-                               to_processing_date=None,include_processing=include_processing)
+                               to_processing_date=None, include_processing=include_processing)
       for line in result:
         m = self.loadMessage(line.message, uid = line.uid)
         m.processing_node = line.processing_node
@@ -496,145 +458,155 @@
     return message_list
 
   def distribute(self, activity_tool, node_count):
-    processing_node = 1
     readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
     if readMessageList is not None:
       now_date = DateTime()
-      if (now_date - self.max_processing_date) > MAX_PROCESSING_TIME:
-        # Sticky processing messages should be set back to non processing
-        max_processing_date = now_date - MAX_PROCESSING_TIME
-        self.max_processing_date = now_date
-      else:
-        max_processing_date = None
-      result = readMessageList(path=None, method_id=None, processing_node = -1,
-                               to_processing_date = max_processing_date,
-                               include_processing=0) # Only assign non assigned messages
-      get_transaction().commit() # Release locks before starting a potentially long calculation
+      result = readMessageList(path=None, method_id=None, processing_node=-1,
+                               to_date=now_date, include_processing=0)
+      get_transaction().commit()
+
+      validation_text_dict = {'none': 1}
+      message_dict = {}
+      for line in result:
+        message = self.loadMessage(line.message, uid = line.uid,
+                                   order_validation_text = line.order_validation_text)
+        self.getExecutableMessageList(activity_tool, message, message_dict,
+                                      validation_text_dict)
+
+      # XXX probably this below can be optimized by assigning multiple messages at a time.
       path_dict = {}
-      for line in result:
-        path = line.path
-        broadcast = line.broadcast
+      assignMessage = activity_tool.SQLDict_assignMessage
+      processing_node = 1
+      for message in message_dict.itervalues():
+        path = '/'.join(message.object_path)
+        broadcast = message.activity_kw.get('broadcast', 0)
         if broadcast:
           # Broadcast messages must be distributed into all nodes.
-          uid = line.uid
-          activity_tool.SQLDict_assignMessage(processing_node=1, uid=[uid])
+          uid = message.uid
+          assignMessage(processing_node=1, uid=[uid])
           if node_count > 1:
-            uid_list = activity_tool.getPortalObject().portal_ids.generateNewLengthIdList(id_group='portal_activity', id_count=node_count - 1)
-            for node in range(2, node_count+1):
-              activity_tool.SQLDict_writeMessage( uid = uid_list.pop(),
-                                                  path = path,
-                                                  method_id = line.method_id,
-                                                  priority = line.priority,
-                                                  broadcast = 1,
-                                                  processing_node = node,
-                                                  message = line.message,
-                                                  date = line.date)
-        elif not path_dict.has_key(path):
-          # Only assign once (it would be different for a queue)
-          path_dict[path] = 1
-          activity_tool.SQLDict_assignMessage(path=path, processing_node=processing_node, uid=None, broadcast=0)
+            id_tool = activity_tool.getPortalObject().portal_ids
+            uid_list = id_tool.generateNewLengthIdList(id_group='portal_activity',
+                                                       id_count=node_count - 1)
+            path_list = [path] * (node_count - 1)
+            method_id_list = [message.method_id] * (node_count - 1)
+            priority_list = [message.activity_kw.get('priority', 1)] * (node_count - 1)
+            processing_node_list = range(2, node_count + 1)
+            broadcast_list = [1] * (node_count - 1)
+            message_list = [self.dumpMessage(message)] * (node_count - 1)
+            date_list = [message.activity_kw.get('at_date', now_date)] * (node_count - 1)
+            group_method_id_list = [message.activity_kw.get('group_method_id', '')] * (node_count - 1)
+            tag_list = [message.activity_kw.get('tag', '')] * (node_count - 1)
+            order_validation_text_list = [message.order_validation_text] * (node_count - 1)
+            activity_tool.SQLDict_writeMessageList(uid_list=uid_list,
+                                                   path_list=path_list,
+                                                   method_id_list=method_id_list,
+                                                   priority_list=priority_list,
+                                                   broadcast_list=broadcast_list,
+                                                   processing_node_list=processing_node_list,
+                                                   message_list=message_list,
+                                                   date_list=date_list,
+                                                   group_method_id_list=group_method_id_list,
+                                                   tag_list=tag_list,
+                                                   order_validation_text_list=order_validation_text_list)
+          get_transaction().commit()
+        else:
+          # Select a processing node. If the same path appears again, dispatch the message to
+          # the same node, so that object caching is more efficient. Otherwise, apply a round
+          # robin scheduling.
+          node = path_dict.get(path)
+          if node is None:
+            node = processing_node
+            path_dict[path] = node
+            processing_node += 1
+            if processing_node > node_count:
+              processing_node = 1
+
+          assignMessage(processing_node=node, uid=[message.uid], broadcast=0)
           get_transaction().commit() # Release locks immediately to allow processing of messages
-          processing_node = processing_node + 1
-          if processing_node > node_count:
-            processing_node = 1 # Round robin
 
   # Validation private methods
+  def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None):
+    if isinstance(method_id, str):
+      method_id = [method_id]
+    if isinstance(path, str):
+      path = [path]
+    if isinstance(tag, str):
+      tag = [tag]
+
+    if method_id or message_uid or path or tag:
+      validateMessageList = activity_tool.SQLDict_validateMessageList
+      result = validateMessageList(method_id=method_id,
+                                   message_uid=message_uid,
+                                   path=path,
+                                   tag=tag)
+      message_list = []
+      for line in result:
+        m = self.loadMessage(line.message,
+                             uid=line.uid,
+                             order_validation_text=line.order_validation_text,
+                             date=line.date,
+                             processing_node=line.processing_node)
+        message_list.append(m)
+      return message_list
+    else:
+      return []
+
   def _validate_after_method_id(self, activity_tool, message, value):
-    # Count number of occurances of method_id
-    if type(value) is StringType:
-      value = [value]
-    if len(value)>0: # if empty list provided, the message is valid
-      result = activity_tool.SQLDict_validateMessageList(method_id=value, message_uid=None, path=None)
-      if result[0].uid_count > 0:
-        return INVALID_ORDER
-    return VALID
+    return self._validate(activity_tool, method_id=value)
 
   def _validate_after_path(self, activity_tool, message, value):
-    # Count number of occurances of path
-    if type(value) is StringType:
-      value = [value]
-    if len(value)>0: # if empty list provided, the message is valid
-      result = activity_tool.SQLDict_validateMessageList(method_id=None, message_uid=None, path=value)
-      if result[0].uid_count > 0:
-        return INVALID_ORDER
-    return VALID
+    return self._validate(activity_tool, path=value)
 
   def _validate_after_message_uid(self, activity_tool, message, value):
-    # Count number of occurances of message_uid
-    result = activity_tool.SQLDict_validateMessageList(method_id=None, message_uid=value, path=None)
-    if result[0].uid_count > 0:
-      return INVALID_ORDER
-    return VALID
+    return self._validate(activity_tool, message_uid=value)
 
   def _validate_after_path_and_method_id(self, activity_tool, message, value):
-    # Count number of occurances of path and method_id
-    if (type(value) != TupleType and type(value) != ListType) or len(value)<2:
-      LOG('CMFActivity WARNING :', 0, 'unable to recognize value for after_path_and_method_id : %s' % repr(value))
-      return VALID
-    path = value[0]
-    method = value[1]
-    if type(path) is StringType:
+    if not isinstance(value, (tuple, list)) or len(value) < 2:
+      LOG('CMFActivity', WARNING,
+          'unable to recognize value for after_path_and_method_id: %r' % (value,))
+      return []
+    return self._validate(activity_tool, path=value[0], method_id=value[1])
+
+  def _validate_after_tag(self, activity_tool, message, value):
+    return self._validate(activity_tool, tag=value)
+
+  def _validate_after_tag_and_method_id(self, activity_tool, message, value):
+    # Count number of occurances of tag and method_id
+    if not isinstance(value, (tuple, list)) or len(value) < 2:
+      LOG('CMFActivity', WARNING,
+          'unable to recognize value for after_tag_and_method_id: %r' % (value,))
+      return []
+    return self._validate(activity_tool, tag=value[0], method_id=value[1])
+
+  def countMessage(self, activity_tool, tag=None, path=None,
+                   method_id=None, message_uid=None, **kw):
+    """Return the number of messages which match the given parameters.
+    """
+    if isinstance(tag, str):
+      tag = [tag]
+    if isinstance(path, str):
       path = [path]
-    if type(method) is StringType:
-      method = [method]
-    result = activity_tool.SQLDict_validateMessageList(method_id=method, message_uid=None, path=path)
-    if result[0].uid_count > 0:
-      return INVALID_ORDER
-    return VALID
-
-  def _validate_after_tag(self, activity_tool, message, value):
-    # Count number of occurances of tag
-    if self.countMessageWithTag(activity_tool, value) > 0:
-      return INVALID_ORDER
-    return VALID
-
-  def countMessage(self, activity_tool, tag=None,path=None,
-                   method_id=None,message_uid=None,**kw):
-    """
-      Return the number of message which match the given parameter.
-    """
-    if isinstance(tag, StringType):
-      tag = [tag]
-    if isinstance(path, StringType):
-      path = [path]
-    if isinstance(message_uid, (int,long)):
-      message_uid = [message_uid]
-    if isinstance(method_id, StringType):
+    elif isinstance(method_id, str):
       method_id = [method_id]
     result = activity_tool.SQLDict_validateMessageList(method_id=method_id, 
                                                        path=path,
                                                        message_uid=message_uid, 
-                                                       tag=tag)
+                                                       tag=tag,
+                                                       count=1)
     return result[0].uid_count
 
   def countMessageWithTag(self, activity_tool, value):
+    """Return the number of messages which match the given tag.
     """
-      Return the number of message which match the given tag.
-    """
-    return self.countMessage(activity_tool,tag=value)
-
-  def _validate_after_tag_and_method_id(self, activity_tool, message, value):
-    # Count number of occurances of tag and method_id
-    if (type(value) != TupleType and type(value) != ListType) or len(value)<2:
-      LOG('CMFActivity WARNING :', 0, 'unable to recognize value for after_tag_and_method_id : %s' % repr(value))
-      return VALID
-    tag = value[0]
-    method = value[1]
-    if type(tag) is StringType:
-      tag = [tag]
-    if type(method) is StringType:
-      method = [method]
-    result = activity_tool.SQLDict_validateMessageList(method_id=method, message_uid=None, tag=tag)
-    if result[0].uid_count > 0:
-      return INVALID_ORDER
-    return VALID
+    return self.countMessage(activity_tool, tag=value)
 
   # Required for tests (time shift)
-  def timeShift(self, activity_tool, delay, processing_node=None,retry=None):
+  def timeShift(self, activity_tool, delay, processing_node=None, retry=None):
     """
       To simulate timeShift, we simply substract delay from
       all dates in SQLDict message table
     """
-    activity_tool.SQLDict_timeShift(delay = delay, processing_node = processing_node,retry=retry)
+    activity_tool.SQLDict_timeShift(delay=delay, processing_node=processing_node,retry=retry)
 
 registerActivity(SQLDict)

Modified: erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLQueue.py?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLQueue.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLQueue.py Wed Apr 11 03:21:56 2007
@@ -26,15 +26,14 @@
 #
 ##############################################################################
 
-import random
 from Products.CMFActivity.ActivityTool import registerActivity
 from RAMQueue import RAMQueue
 from DateTime import DateTime
-from Queue import VALID, INVALID_ORDER, INVALID_PATH, EXCEPTION, MAX_PROCESSING_TIME, VALIDATION_ERROR_DELAY
-from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
+from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY
+from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
 from Products.CMFActivity.Errors import ActivityFlushError
 from ZODB.POSException import ConflictError
-from types import StringType, ClassType
+from types import ClassType
 import sys
 
 try:
@@ -61,8 +60,9 @@
   """
   def prepareQueueMessage(self, activity_tool, m):
     if m.is_registered:
-      #import pdb; pdb.set_trace()
-      activity_tool.SQLQueue_writeMessage(uid = activity_tool.getPortalObject().portal_ids.generateNewLengthId(id_group='portal_activity_queue'),
+      id_tool = activity_tool.getPortalObject().portal_ids
+      uid = id_tool.generateNewLengthId(id_group='portal_activity_queue')
+      activity_tool.SQLQueue_writeMessage(uid = uid,
                                           path = '/'.join(m.object_path) ,
                                           method_id = m.method_id,
                                           priority = m.activity_kw.get('priority', 1),
@@ -84,13 +84,7 @@
     now_date = DateTime()
     # Next processing date in case of error
     next_processing_date = now_date + float(VALIDATION_ERROR_DELAY)/86400
-    priority = random.choice(priority_weight)
-    # Try to find a message at given priority level
-    result = readMessage(processing_node=processing_node, priority=priority,
-                         to_date=now_date)
-    if len(result) == 0:
-      # If empty, take any message
-      result = readMessage(processing_node=processing_node, priority=None,to_date=now_date)
+    result = readMessage(processing_node=processing_node, to_date=now_date)
     if len(result) > 0:
       line = result[0]
       path = line.path
@@ -103,23 +97,17 @@
       try:
         m = self.loadMessage(line.message)
         # Make sure object exists
-        validation_state = m.validate(self, activity_tool)
+        validation_state = m.validate(self, activity_tool, check_order_validation=0)
         if validation_state is not VALID:
-          if validation_state in (EXCEPTION, INVALID_PATH):
-            if line.priority > MAX_PRIORITY:
-              # This is an error.
-              # Assign message back to 'error' state.
-              activity_tool.SQLQueue_assignMessage(uid=line.uid,
-                                                   processing_node = VALIDATE_ERROR_STATE)
-              get_transaction().commit()                                        # and commit
-            else:
-              # Lower priority
-              activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1)
-              get_transaction().commit() # Release locks before starting a potentially long calculation
+          if line.priority > MAX_PRIORITY:
+            # This is an error.
+            # Assign message back to 'error' state.
+            activity_tool.SQLQueue_assignMessage(uid=line.uid,
+                                                 processing_node=VALIDATE_ERROR_STATE)
+            get_transaction().commit()                                        # and commit
           else:
-            # We do not lower priority for INVALID_ORDER errors but we do postpone execution
-            activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date,
-                                                priority = line.priority)
+            # Lower priority
+            activity_tool.SQLQueue_setPriority(uid=line.uid, priority=line.priority + 1)
             get_transaction().commit() # Release locks before starting a potentially long calculation
           return 0
 
@@ -139,8 +127,8 @@
         # An exception happens at somewhere else but invoke, so messages
         # themselves should not be delayed.
         try:
-          activity_tool.SQLQueue_setPriority(uid = line.uid, date = line.date,
-                                             priority = line.priority)
+          activity_tool.SQLQueue_setPriority(uid=line.uid, date=line.date,
+                                             priority=line.priority)
           get_transaction().commit()
         except:
           LOG('SQLQueue', ERROR, 'SQLQueue.dequeueMessage raised, and cannot even set processing to zero due to an exception',
@@ -159,25 +147,26 @@
             # Unfortunately, database adapters may raise an exception against abort.
             LOG('SQLQueue', WARNING, 'abort failed, thus some objects may be modified accidentally')
             pass
-  
+
           if type(m.exc_type) is ClassType \
                   and issubclass(m.exc_type, ConflictError):
-            activity_tool.SQLQueue_setPriority(uid = line.uid, 
-                                              date = next_processing_date,
-                                              priority = line.priority)
+            activity_tool.SQLQueue_setPriority(uid=line.uid,
+                                               date=next_processing_date,
+                                               priority=line.priority)
           elif line.priority > MAX_PRIORITY:
             # This is an error
-            activity_tool.SQLQueue_assignMessage(uid = line.uid, 
-                                                processing_node = INVOKE_ERROR_STATE)
+            activity_tool.SQLQueue_assignMessage(uid=line.uid,
+                                                 processing_node=INVOKE_ERROR_STATE)
                                                                               # Assign message back to 'error' state
             m.notifyUser(activity_tool)                                       # Notify Error
           else:
             # Lower priority
-            activity_tool.SQLQueue_setPriority(uid=line.uid, date = next_processing_date,
-                                                priority = line.priority + 1)
+            activity_tool.SQLQueue_setPriority(uid=line.uid, date=next_processing_date,
+                                               priority=line.priority + 1)
         get_transaction().commit()
       except:
-        LOG('SQLQueue', ERROR, 'SQLQueue.dequeueMessage raised an exception during checking for the results of processed messages',
+        LOG('SQLQueue', ERROR,
+            'SQLQueue.dequeueMessage raised an exception during checking for the results of processed messages',
             error=sys.exc_info())
         raise
       return 0
@@ -215,32 +204,45 @@
       # Parse each message in registered
       for m in activity_tool.getRegisteredMessageList(self):
         if object_path == m.object_path and (method_id is None or method_id == m.method_id):
-          if invoke: activity_tool.invoke(m)
-          activity_tool.unregisterMessage(self, m)
-      # Parse each message in SQL queue
-      #LOG('Flush', 0, str((path, invoke, method_id)))
-      result = readMessageList(path=path, method_id=method_id,processing_node=None)
-      #LOG('Flush', 0, str(len(result)))
-      method_dict = {}
-      for line in result:
-        path = line.path
-        method_id = line.method_id
-        if not method_dict.has_key(method_id):
-          # Only invoke once (it would be different for a queue)
-          method_dict[method_id] = 1
-          m = self.loadMessage(line.message, uid = line.uid)
           if invoke:
             # First Validate
-            if m.validate(self, activity_tool) is VALID:
+            validate_value = m.validate(self, activity_tool)
+            if validate_value is VALID:
               activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
               if not m.is_executed:                                                 # Make sure message could be invoked
                 # The message no longer exists
                 raise ActivityFlushError, (
-                    'Could not evaluate %s on %s' % (method_id , path))
-            else:
+                    'Could not evaluate %s on %s' % (m.method_id , path))
+            elif validate_value is INVALID_PATH:
               # The message no longer exists
               raise ActivityFlushError, (
                   'The document %s does not exist' % path)
+            else:
+              raise ActivityFlushError, (
+                  'Could not validate %s on %s' % (m.method_id , path))
+          activity_tool.unregisterMessage(self, m)
+      # Parse each message in SQL queue
+      result = readMessageList(path=path, method_id=method_id, processing_node=None)
+      for line in result:
+        path = line.path
+        method_id = line.method_id
+        m = self.loadMessage(line.message, uid = line.uid)
+        if invoke:
+          # First Validate
+          validate_value = m.validate(self, activity_tool)
+          if validate_value is VALID:
+            activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
+            if not m.is_executed:                                                 # Make sure message could be invoked
+              # The message no longer exists
+              raise ActivityFlushError, (
+                  'Could not evaluate %s on %s' % (method_id , path))
+          elif validate_value is INVALID_PATH:
+            # The message no longer exists
+            raise ActivityFlushError, (
+                'The document %s does not exist' % path)
+          else:
+            raise ActivityFlushError, (
+                'Could not validate %s on %s' % (m.method_id , path))
 
       if len(result):
         activity_tool.SQLQueue_delMessage(uid = [line.uid for line in result])
@@ -265,31 +267,27 @@
         message_list.append(m)
     return message_list
 
-  def countMessage(self, activity_tool, tag=None,path=None,
-                   method_id=None,message_uid=None,**kw):
-    """
-      Return the number of message which match the given parameter.
-    """
-    if isinstance(tag, StringType):
+  def countMessage(self, activity_tool, tag=None, path=None,
+                   method_id=None, message_uid=None, **kw):
+    """Return the number of messages which match the given parameters.
+    """
+    if isinstance(tag, str):
       tag = [tag]
-    if isinstance(path, StringType):
+    if isinstance(path, str):
       path = [path]
-    if isinstance(message_uid, (int,long)):
-      message_uid = [message_uid]
-    if isinstance(method_id, StringType):
+    if isinstance(method_id, str):
       method_id = [method_id]
     result = activity_tool.SQLQueue_validateMessageList(method_id=method_id, 
-                                                       path=path,
-                                                       message_uid=message_uid, 
-                                                       tag=tag)
+                                                        path=path,
+                                                        message_uid=message_uid, 
+                                                        tag=tag,
+                                                        count=1)
     return result[0].uid_count
 
   def countMessageWithTag(self, activity_tool, value):
-    """
-      Return the number of message which match the given tag.
-    """
-    return self.countMessage(activity_tool,tag=value)
-
+    """Return the number of messages which match the given tag.
+    """
+    return self.countMessage(activity_tool, tag=value)
 
   def dumpMessageList(self, activity_tool):
     # Dump all messages in the table.
@@ -306,105 +304,115 @@
     processing_node = 1
     readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None)
     if readMessageList is not None:
-      result = readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages
-      #LOG('distribute count',0,str(len(result)) )
-      #LOG('distribute count',0,str(map(lambda x:x.uid, result)))
-      #get_transaction().commit() # Release locks before starting a potentially long calculation
-      result = list(result)[0:100]
+      now_date = DateTime()
+      result = readMessageList(path=None, method_id=None,
+                               processing_node=-1, to_date=now_date)
+      get_transaction().commit()
+
+      validation_text_dict = {'none': 1}
+      message_dict = {}
       for line in result:
-        broadcast = line.broadcast
-        uid = line.uid
+        message = self.loadMessage(line.message, uid=line.uid)
+        message.order_validation_text = self.getOrderValidationText(message)
+        self.getExecutableMessageList(activity_tool, message, message_dict,
+                                      validation_text_dict)
+
+      # XXX probably this below can be optimized by assigning multiple messages at a time.
+      path_dict = {}
+      assignMessage = activity_tool.SQLQueue_assignMessage
+      processing_node = 1
+      for message in message_dict.itervalues():
+        path = '/'.join(message.object_path)
+        broadcast = message.activity_kw.get('broadcast', 0)
         if broadcast:
           # Broadcast messages must be distributed into all nodes.
-          activity_tool.SQLQueue_assignMessage(processing_node=1, uid=uid)
+          assignMessage(processing_node=1, uid=message.uid)
           if node_count > 1:
-            uid_list = activity_tool.getPortalObject().portal_ids.generateNewLengthIdList(id_group='portal_activity_queue', id_count=node_count - 1)
-            for node in range(2, node_count+1):
-              activity_tool.SQLQueue_writeMessage(uid = uid_list.pop(),
-                                                  path = line.path,
-                                                  method_id = line.method_id,
-                                                  priority = line.priority,
-                                                  broadcast = 1,
-                                                  processing_node = node,
-                                                  message = line.message,
-                                                  date = line.date)
+            id_tool = activity_tool.getPortalObject().portal_ids
+            uid_list = id_tool.generateNewLengthIdList(id_group='portal_activity_queue',
+                                                       id_count=node_count - 1)
+            priority = message.activity_kw.get('priority', 1)
+            dumped_message = self.dumpMessage(message)
+            date = message.activity_kw.get('at_date', now_date)
+            tag = message.activity_kw.get('tag', '')
+            for node in xrange(2, node_count+1):
+              activity_tool.SQLQueue_writeMessage(uid=uid_list.pop(),
+                                                  path=path,
+                                                  method_id=message.method_id,
+                                                  priority=priority,
+                                                  broadcast=1,
+                                                  processing_node=node,
+                                                  message=dumped_message,
+                                                  date=date,
+                                                  tag=tag)
+          get_transaction().commit()
         else:
-          #LOG("distribute", 0, "assign %s" % uid)
-          activity_tool.SQLQueue_assignMessage(uid=uid, processing_node=processing_node)
-          #get_transaction().commit() # Release locks immediately to allow processing of messages
-          processing_node = processing_node + 1
-          if processing_node > node_count:
-            processing_node = 1 # Round robin
+          # Select a processing node. If the same path appears again, dispatch the message to
+          # the same node, so that object caching is more efficient. Otherwise, apply a round
+          # robin scheduling.
+          node = path_dict.get(path)
+          if node is None:
+            node = processing_node
+            path_dict[path] = node
+            processing_node += 1
+            if processing_node > node_count:
+              processing_node = 1
+
+          assignMessage(processing_node=node, uid=message.uid, broadcast=0)
+          get_transaction().commit() # Release locks immediately to allow processing of messages
 
   # Validation private methods
+  def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None):
+    if isinstance(method_id, str):
+      method_id = [method_id]
+    if isinstance(path, str):
+      path = [path]
+    if isinstance(tag, str):
+      tag = [tag]
+
+    if method_id or message_uid or path or tag:
+      validateMessageList = activity_tool.SQLQueue_validateMessageList
+      result = validateMessageList(method_id=method_id,
+                                   message_uid=message_uid,
+                                   path=path,
+                                   tag=tag)
+      message_list = []
+      for line in result:
+        m = self.loadMessage(line.message,
+                             uid=line.uid,
+                             date=line.date,
+                             processing_node=line.processing_node)
+        m.order_validation_text = self.getOrderValidationText(m)
+        message_list.append(m)
+      return message_list
+    else:
+      return []
+
   def _validate_after_method_id(self, activity_tool, message, value):
-    # Count number of occurances of method_id
-    #get_transaction().commit()
-    if type(value) == type(''):
-      value = [value]
-    result = activity_tool.SQLQueue_validateMessageList(method_id=value, message_uid=None, path=None)
-    #LOG('SQLQueue._validate_after_method_id, method_id',0,value)
-    #LOG('SQLQueue._validate_after_method_id, result[0].uid_count',0,result[0].uid_count)
-    if result[0].uid_count > 0:
-      return INVALID_ORDER
-    return VALID
+    return self._validate(activity_tool, method_id=value)
 
   def _validate_after_path(self, activity_tool, message, value):
-    # Count number of occurances of path
-    if type(value) == type(''):
-      value = [value]
-    result = activity_tool.SQLQueue_validateMessageList(method_id=None, message_uid=None, path=value)
-    if result[0].uid_count > 0:
-      return INVALID_ORDER
-    return VALID
+    return self._validate(activity_tool, path=value)
 
   def _validate_after_message_uid(self, activity_tool, message, value):
-    # Count number of occurances of message_uid
-    result = activity_tool.SQLQueue_validateMessageList(method_id=None, message_uid=value, path=None)
-    if result[0].uid_count > 0:
-      return INVALID_ORDER
-    return VALID
+    return self._validate(activity_tool, message_uid=value)
 
   def _validate_after_path_and_method_id(self, activity_tool, message, value):
-    # Count number of occurances of method_id and path
-    if (type(value) != type( (0,) ) and type(value) != type ([])) or len(value)<2:
-      LOG('CMFActivity WARNING :', 0, 'unable to recognize value for after_path_and_method : %s' % repr(value))
-      return VALID
-    path = value[0]
-    method = value[1]
-    if type(path) == type(''):
-      path = [path]
-    if type(method) == type(''):
-      method = [method]
-    result = activity_tool.SQLQueue_validateMessageList(method_id=method, message_uid=None, path=path)
-    if result[0].uid_count > 0:
-      return INVALID_ORDER
-    return VALID
+    if not isinstance(value, (tuple, list)) or len(value) < 2:
+      LOG('CMFActivity', WARNING,
+          'unable to recognize value for after_path_and_method: %r' % (value,))
+      return []
+    return self._validate(activity_tool, path=value[0], method_id=value[1])
 
   def _validate_after_tag(self, activity_tool, message, value):
-    # Count number of occurances of tag
-    if type(value) == type(''):
-      value = [value]
-    result = activity_tool.SQLQueue_validateMessageList(method_id=None, message_uid=None, tag=value)
-    if result[0].uid_count > 0:
-      return INVALID_ORDER
-    return VALID
+    return self._validate(activity_tool, tag=value)
 
   def _validate_after_tag_and_method_id(self, activity_tool, message, value):
-    # Count number of occurances of tag and method_id
-    if (type(value) != type ( (0,) ) and type(value) != type([])) or len(value)<2:
-      LOG('CMFActivity WARNING :', 0, 'unable to recognize value for after_tag_and_method_id : %s' % repr(value))
-      return VALID
-    tag = value[0]
-    method = value[1]
-    if type(tag) == type(''):
-      tag = [tag]
-    if type(method) == type(''):
-      method = [method]
-    result = activity_tool.SQLQueue_validateMessageList(method_id=method, message_uid=None, tag=tag)
-    if result[0].uid_count > 0:
-      return INVALID_ORDER
-    return VALID
+    if not isinstance(value, (tuple, list)) or len(value) < 2:
+      LOG('CMFActivity', WARNING,
+          'unable to recognize value for after_tag_and_method_id: %r' % (value,))
+      return []
+    return self._validate(activity_tool, tag=value[0], method_id=value[1])
 
   # Required for tests (time shift)
   def timeShift(self, activity_tool, delay, processing_node = None):
@@ -412,6 +420,6 @@
       To simulate timeShift, we simply substract delay from
       all dates in SQLDict message table
     """
-    activity_tool.SQLQueue_timeShift(delay = delay, processing_node = processing_node)
+    activity_tool.SQLQueue_timeShift(delay=delay, processing_node=processing_node)
 
 registerActivity(SQLQueue)

Modified: erp5/trunk/products/CMFActivity/ActivityTool.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/ActivityTool.py?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/ActivityTool.py (original)
+++ erp5/trunk/products/CMFActivity/ActivityTool.py Wed Apr 11 03:21:56 2007
@@ -86,14 +86,14 @@
 
 class Message:
   """Activity Message Class.
-  
+
   Message instances are stored in an activity queue, inside the Activity Tool.
   """
-  def __init__(self, object, active_process, activity_kw, method_id, args, kw):
-    if type(object) is StringType:
-      self.object_path = object.split('/')
+  def __init__(self, obj, active_process, activity_kw, method_id, args, kw):
+    if isinstance(obj, str):
+      self.object_path = obj.split('/')
     else:
-      self.object_path = object.getPhysicalPath()
+      self.object_path = obj.getPhysicalPath()
     if type(active_process) is StringType:
       self.active_process = active_process.split('/')
     elif active_process is None:
@@ -114,7 +114,7 @@
   def getObject(self, activity_tool):
     """return the object referenced in this message."""
     return activity_tool.unrestrictedTraverse(self.object_path)
-    
+
   def getObjectList(self, activity_tool):
     """return the list of object that can be expanded from this message."""
     try:
@@ -124,9 +124,9 @@
       object_list = getattr(obj, expand_method_id)()
     except KeyError:
       object_list = [self.getObject(activity_tool)]
-      
+
     return object_list
-      
+
   def hasExpandMethod(self):
     """return true if the message has an expand method.
     An expand method is used to expand the list of objects and to turn a
@@ -134,7 +134,7 @@
     transactions affecting only one object at a time (this can prevent
     duplicated method calls)."""
     return self.activity_kw.has_key('expand_method_id')
-    
+
   def changeUser(self, user_name, activity_tool):
     """restore the security context for the calling user."""
     uf = activity_tool.getPortalObject().acl_users
@@ -169,7 +169,7 @@
                     ActiveResult(object_path=object,
                           method_id=self.method_id,
                           result=result)) # XXX Allow other method_id in future
-  
+
   def __call__(self, activity_tool):
     try:
       obj = self.getObject(activity_tool)
@@ -198,8 +198,13 @@
       if hasattr(activity_tool, 'error_log'):
         activity_tool.error_log.raising(sys.exc_info())
 
-  def validate(self, activity, activity_tool):
-    return activity.validate(activity_tool, self, **self.activity_kw)
+  def validate(self, activity, activity_tool, check_order_validation=1):
+    return activity.validate(activity_tool, self,
+                             check_order_validation=check_order_validation,
+                             **self.activity_kw)
+
+  def getDependentMessageList(self, activity, activity_tool):
+    return activity.getDependentMessageList(activity_tool, self, **self.activity_kw)
 
   def notifyUser(self, activity_tool, message="Failed Processing Activity"):
     """Notify the user that the activity failed."""
@@ -477,9 +482,9 @@
                 REQUEST.URL1 +
                 '/manageLoadBalancing?manage_tabs_message=' +
                 urllib.quote("Node(s) successfully deleted."))
-        
+
     def process_timer(self, tick, interval, prev="", next=""):
-        """ 
+        """
         Call distribute() if we are the Distributing Node and call tic()
         with our node number.
         This method is called by TimerService in the interval given
@@ -489,23 +494,23 @@
         acquired = timerservice_lock.acquire(0)
         if not acquired:
           return
-        
+
         old_sm = getSecurityManager()
         try:
           # get owner of portal_catalog, so normally we should be able to
           # have the permission to invoke all activities
           user = self.portal_catalog.getWrappedOwner()
           newSecurityManager(self.REQUEST, user)
-          
+
           currentNode = self.getCurrentNode()
-          
+
           # only distribute when we are the distributingNode or if it's empty
           if (self.distributingNode == self.getCurrentNode()):
             self.distribute(len(self._nodes))
 
           elif not self.distributingNode:
             self.distribute(1)
-          
+
           # SkinsTool uses a REQUEST cache to store skin objects, as
           # with TimerService we have the same REQUEST over multiple
           # portals, we clear this cache to make sure the cache doesn't
@@ -513,13 +518,13 @@
           stool = getToolByName(self, 'portal_skins', None)
           if stool is not None:
             stool.changeSkin(None)
-          
+
           # call tic for the current processing_node
           # the processing_node numbers are the indices of the elements in the node tuple +1
           # because processing_node starts form 1
           if currentNode in self._nodes:
             self.tic(list(self._nodes).index(currentNode)+1)
-              
+
           elif len(self._nodes) == 0:
             self.tic(1)
 
@@ -566,9 +571,9 @@
 
       # Initialize if needed
       if not is_initialized: self.initialize()
-      
+
       inner_self = aq_inner(self)
-      
+
       # If this is the first tic after zope is started, reset the processing
       # flag for activities of this node
       if first_run:
@@ -587,7 +592,7 @@
             raise
           except:
             LOG('CMFActivity:', 100, 'Core call to wakeup failed for activity %s' % activity)
-  
+
         # Process messages on each queue in round robin
         has_awake_activity = 1
         while has_awake_activity:
@@ -673,7 +678,7 @@
 
     def invoke(self, message):
       message(self)
- 
+
     def invokeGroup(self, method_id, message_list):
       # Invoke a group method.
       object_list = []
@@ -770,7 +775,7 @@
               LOG('ActivityTool', WARNING,
                   'Could not call method %s on object %s' % (
                   m.method_id, m.object_path), error=sys.exc_info())
- 
+
     def newMessage(self, activity, path, active_process,
                    activity_kw, method_id, *args, **kw):
       # Some Security Cheking should be made here XXX
@@ -826,8 +831,8 @@
               LOG('ActivityTool', WARNING,
                   'could not dump messages from %s' %
                   (activity,), error=sys.exc_info())
-            
-      if hasattr(folder, 'SQLDict_createMessageTable'):
+
+      if getattr(folder, 'SQLDict_createMessageTable', None) is not None:
         try:
           folder.SQLDict_dropMessageTable()
         except ConflictError:
@@ -838,7 +843,7 @@
               error=sys.exc_info())
         folder.SQLDict_createMessageTable()
 
-      if hasattr(folder, 'SQLQueue_createMessageTable'):
+      if getattr(folder, 'SQLQueue_createMessageTable', None) is not None:
         try:
           folder.SQLQueue_dropMessageTable()
         except ConflictError:
@@ -920,16 +925,24 @@
       self.immediateReindexObject()
 
     # Active synchronisation methods
+    security.declarePrivate('validateOrder')
     def validateOrder(self, message, validator_id, validation_value):
+      message_list = self.getDependentMessageList(message, validator_id, validation_value)
+      return len(message_list) > 0
+
+    security.declarePrivate('getDependentMessageList')
+    def getDependentMessageList(self, message, validator_id, validation_value):
       global is_initialized
       if not is_initialized: self.initialize()
+      message_list = []
       for activity in activity_list:
         method_id = "_validate_%s" % validator_id
-        if hasattr(activity, method_id):
-          if getattr(activity,method_id)(aq_inner(self),
-                     message, validation_value):
-            return 1
-      return 0
+        method = getattr(activity, method_id, None)
+        if method is not None:
+          result = method(aq_inner(self), message, validation_value)
+          if result:
+            message_list.extend([(activity, m) for m in result])
+      return message_list
 
     # Required for tests (time shift)
     def timeShift(self, delay):

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_clearProcessingFlag.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_clearProcessingFlag.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_clearProcessingFlag.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_clearProcessingFlag.zsql Wed Apr 11 03:21:56 2007
@@ -2,12 +2,12 @@
 title:
 connection_id:cmf_activity_sql_connection
 max_rows:1000
-max_cache:100
+max_cache:0
 cache_time:0
 class_name:
 class_file:
 </dtml-comment>
-<params>processing_node:int=-1</params>
+<params>processing_node</params>
 UPDATE
   message
 SET

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql Wed Apr 11 03:21:56 2007
@@ -2,7 +2,7 @@
 title:
 connection_id:cmf_activity_sql_connection
 max_rows:1000
-max_cache:100
+max_cache:0
 cache_time:0
 class_name:
 class_file:

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql Wed Apr 11 03:21:56 2007
@@ -13,5 +13,6 @@
   processing_date = <dtml-sqlvar "_.DateTime()" type="datetime">,
   processing = 1
 WHERE
-<dtml-in uid>uid = <dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>
- OR </dtml-if></dtml-in>
+  uid IN (
+<dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
+  )

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readMessageList.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readMessageList.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readMessageList.zsql Wed Apr 11 03:21:56 2007
@@ -11,19 +11,9 @@
 method_id
 processing_node
 priority
-to_processing_date
-include_processing</params>
-<dtml-if to_processing_date>UPDATE message
-SET
-  processing = 0
-WHERE
-  processing = 1
-AND
-  processing_date < <dtml-sqlvar to_processing_date type="datetime">
-  
-<dtml-var "'\0'">
-
-</dtml-if>SELECT * FROM
+include_processing
+to_date</params>
+SELECT * FROM
     message
 WHERE
 1 = 1
@@ -34,5 +24,6 @@
 <dtml-if priority> AND priority = <dtml-sqlvar priority type="int"> </dtml-if>
 <dtml-if path>AND path = <dtml-sqlvar path type="string"> </dtml-if>
 <dtml-if method_id> AND method_id = <dtml-sqlvar method_id type="string"> </dtml-if>
+<dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>
 ORDER BY
      priority, date, uid

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readUidList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readUidList.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readUidList.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readUidList.zsql Wed Apr 11 03:21:56 2007
@@ -1,8 +1,8 @@
 <dtml-comment>
 title:
 connection_id:cmf_activity_sql_connection
-max_rows:10000
-max_cache:100
+max_rows:1000
+max_cache:0
 cache_time:0
 class_name:
 class_file:

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_validateMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_validateMessageList.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_validateMessageList.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_validateMessageList.zsql Wed Apr 11 03:21:56 2007
@@ -11,32 +11,31 @@
 message_uid
 path
 tag
+count
 </params>
 SELECT
-    COUNT(DISTINCT uid) as uid_count
+<dtml-if count>
+    COUNT(*) AS uid_count
+<dtml-else>
+    *
+</dtml-if>
 FROM
     message
 WHERE
     processing_node >= -2
 <dtml-if method_id>
-    AND (
-<dtml-in method_id>
-        method_id = <dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else> OR </dtml-if>
-</dtml-in>
+    AND method_id IN (
+<dtml-in method_id><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
     )
 </dtml-if>
 <dtml-if message_uid>AND uid = <dtml-sqlvar message_uid type="int"> </dtml-if>
 <dtml-if path>
-    AND (
-<dtml-in path>
-        path = <dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else> OR </dtml-if>
-</dtml-in>
+    AND path IN (
+<dtml-in path><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
     )
 </dtml-if>
 <dtml-if tag>
-    AND (
-<dtml-in tag>
-        tag = <dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else> OR </dtml-if>
-</dtml-in>
+    AND tag IN (
+<dtml-in tag><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
     )
 </dtml-if>

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql Wed Apr 11 03:21:56 2007
@@ -11,7 +11,7 @@
 processing_node
 method_id
 broadcast
-uid:int=0</params>
+uid</params>
 UPDATE message_queue
 SET
   processing_node=<dtml-sqlvar processing_node type="int">,

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_clearProcessingFlag.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_clearProcessingFlag.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_clearProcessingFlag.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_clearProcessingFlag.zsql Wed Apr 11 03:21:56 2007
@@ -7,7 +7,7 @@
 class_name:
 class_file:
 </dtml-comment>
-<params>processing_node:int=-1</params>
+<params>processing_node</params>
 UPDATE
   message_queue
 SET

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql Wed Apr 11 03:21:56 2007
@@ -2,7 +2,7 @@
 title:
 connection_id:cmf_activity_sql_connection
 max_rows:1000
-max_cache:100
+max_cache:0
 cache_time:0
 class_name:
 class_file:

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessage.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessage.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessage.zsql Wed Apr 11 03:21:56 2007
@@ -19,4 +19,4 @@
 <dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>
 
 ORDER BY
-    priority, date
+    priority, date, uid

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessageList.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessageList.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessageList.zsql Wed Apr 11 03:21:56 2007
@@ -10,7 +10,8 @@
 <params>path
 method_id
 processing_node
-priority</params>
+priority
+to_date</params>
 SELECT * FROM
     message_queue
 WHERE
@@ -19,3 +20,6 @@
 <dtml-if priority>AND priority = <dtml-sqlvar priority type="int"> </dtml-if>
 <dtml-if path>AND path = <dtml-sqlvar path type="string"></dtml-if>
 <dtml-if method_id>AND method_id = <dtml-sqlvar method_id type="string"></dtml-if>
+<dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>
+ORDER BY
+    priority, date, uid

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readUidList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readUidList.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readUidList.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readUidList.zsql Wed Apr 11 03:21:56 2007
@@ -1,8 +1,8 @@
 <dtml-comment>
 title:
 connection_id:cmf_activity_sql_connection
-max_rows:10000
-max_cache:100
+max_rows:1000
+max_cache:0
 cache_time:0
 class_name:
 class_file:
@@ -16,5 +16,5 @@
 WHERE
     processing <> 1
 <dtml-if processing_node> AND processing_node = <dtml-sqlvar processing_node type="int"></dtml-if>
-<dtml-if path>AND path = <dtml-sqlvar path type="string"></dtml-if>
-<dtml-if to_date>AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>
+<dtml-if path> AND path = <dtml-sqlvar path type="string"></dtml-if>
+<dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_setPriority.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_setPriority.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_setPriority.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_setPriority.zsql Wed Apr 11 03:21:56 2007
@@ -11,10 +11,10 @@
 priority
 date</params>
 UPDATE
-	message_queue
+    message_queue
 SET
-	priority = <dtml-sqlvar priority type="int">,
-	processing = 0,
-  date = <dtml-sqlvar date type="datetime">  
+    priority = <dtml-sqlvar priority type="int">,
+    processing = 0,
+    date = <dtml-sqlvar date type="datetime">  
 WHERE
-	uid = <dtml-sqlvar uid type="int">
+    uid = <dtml-sqlvar uid type="int">

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_validateMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_validateMessageList.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_validateMessageList.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_validateMessageList.zsql Wed Apr 11 03:21:56 2007
@@ -11,32 +11,31 @@
 message_uid
 path
 tag
+count
 </params>
 SELECT
-    COUNT(DISTINCT uid) as uid_count
+<dtml-if count>
+    COUNT(*) AS uid_count
+<dtml-else>
+    *
+</dtml-if>
 FROM
     message_queue
 WHERE
     processing_node >= -2
 <dtml-if method_id>
-    AND (
-<dtml-in method_id>
-        method_id = <dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else> OR </dtml-if>
-</dtml-in>
+    AND method_id IN (
+<dtml-in method_id><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
     )
 </dtml-if>
 <dtml-if message_uid>AND uid = <dtml-sqlvar message_uid type="int"> </dtml-if>
 <dtml-if path>
-    AND (
-<dtml-in path>
-        path = <dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else> OR </dtml-if>
-</dtml-in>
+    AND path IN ( 
+<dtml-in path><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
     )
 </dtml-if>
 <dtml-if tag>
-    AND (
-<dtml-in tag>
-        tag = <dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else> OR </dtml-if>
-</dtml-in>
+    AND tag IN (
+<dtml-in tag><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
     )
 </dtml-if>

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_writeMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_writeMessage.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_writeMessage.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_writeMessage.zsql Wed Apr 11 03:21:56 2007
@@ -13,18 +13,20 @@
 message
 priority
 broadcast
-processing_node=-1
+processing_node
 date
 tag</params>
 INSERT INTO message_queue
 SET
   uid = <dtml-sqlvar uid type="int">,
-	path = <dtml-sqlvar path type="string">,
+  path = <dtml-sqlvar path type="string">,
   <dtml-if date>date = <dtml-sqlvar date type="datetime">, <dtml-else>date = <dtml-sqlvar "_.DateTime()" type="datetime">, </dtml-if> 
-	method_id = <dtml-sqlvar method_id type="string">,
-	processing_node = <dtml-sqlvar processing_node type="int">,
-	broadcast = <dtml-sqlvar broadcast type="int">,
-	processing = -1,
-	priority = <dtml-sqlvar priority type="int">,
-	tag = <dtml-sqlvar tag type="string">,
-	message = <dtml-sqlvar message type="string">
+  method_id = <dtml-sqlvar method_id type="string">,
+<dtml-if processing_node>
+  processing_node = <dtml-sqlvar processing_node type="int">,
+</dtml-if>
+  broadcast = <dtml-sqlvar broadcast type="int">,
+  processing = -1,
+  priority = <dtml-sqlvar priority type="int">,
+  tag = <dtml-sqlvar tag type="string">,
+  message = <dtml-sqlvar message type="string">




More information about the Erp5-report mailing list