[Erp5-report] r17759 - in /erp5/trunk/products/CMFActivity: ./ Activity/ skins/activity/

nobody at svn.erp5.org nobody at svn.erp5.org
Fri Nov 23 11:02:39 CET 2007


Author: seb
Date: Fri Nov 23 11:02:39 2007
New Revision: 17759

URL: http://svn.erp5.org?rev=17759&view=rev
Log:
Commit work done by Vincent

  Make ActivityTool:Message.getObjectList simple to use: detect internaly 
wether an expand method must be called, catch exception when object on which 
the activity was executed cannot be found.
  Remove broadcast message support.
  Merge indexes on processing_node and processing columns on both message and 
message_queue tables.
  Always use SQL server's time.
  Do not update processing node value when setting the message as being 
processed.
  Commit SQL connection as soon as messages get assigned to reduce lock 
duration.
  Make SQLDict ZSQLMethods support list of uids instead of single value per 
call.
  Make ZSQLMethod handle processing_node differently if it's 0 or None (when 
not passed as parameter, behave as if it's None).
  Do not force all parameters to be passed to SQLQueue_setPriority.
  Factorise SQL code inside <dtml-if> blocks.
  Allow to select ranges of lines in readMessageList with a custom offset.
  When reseting message processing state at first activity execution pass 
after a node start, also reset the processing_node.
  Commit SQL connection as soon as messages are set to processing state, 
mainly to make it visible outside current connection.
  Add a common class for SQL-using activity queues.
  CMFActivity/Activity/SQLDict.py
    Remove unused (and broken) prepareQueueMessage method.
    Replace a tab by spaces.
    Add ZSQLMethod wrappers for new ZSQLMethods.
    Split dequeueMessage into dequeueMessage, getProcessableMessageList, 
finalizeMessage_Execution.
    Return True instead of 0 in case of an important error, in order to 
prevent CMFActivity from doing infinite loops  over dequeueMessage when 
something goes wrong.
  CMFActivity/Activity/Queue.py
    Allow caller to specify the current date and transmit it when recursing. 
Fallback on DateTime (calculate just once) if not specified.
  CMFActivity/Activity/SQLQueue.py
    Precompute parameters in prepareQueueMessage to make it easier to add a 
log when needed. Also reduces the distance with SQLDict's equivalent method.
    Add ZSQLMethod wrappers for new ZSQLMethods.
    Split dequeueMessage into dequeueMessage, getProcessableMessageList, 
finalizeMessage_Execution.
    Return True instead of 0 in case of an important error, in order to 
prevent CMFActivity from doing infinite loops  over dequeueMessage when 
something goes wrong.
  Add scripts to monitor activity distribution.  
  Remove unused ZSQLMethods.
  Add new ZSQLMethods related to the new distribution scheme and SQL server 
time grabbing.

Added:
    erp5/trunk/products/CMFActivity/Activity/SQLBase.py
    erp5/trunk/products/CMFActivity/skins/activity/SQLBase_getNow.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_deleteDuplicatedMessageList.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql
Modified:
    erp5/trunk/products/CMFActivity/Activity/Queue.py
    erp5/trunk/products/CMFActivity/Activity/SQLDict.py
    erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
    erp5/trunk/products/CMFActivity/ActivityTool.py
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_assignMessage.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_clearProcessingFlag.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readMessage.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readMessageList.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readUidList.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_setPriority.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_timeShift.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_writeMessage.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_writeMessageList.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_clearProcessingFlag.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_processMessage.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessage.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessageList.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readUidList.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_setPriority.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_timeShift.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_writeMessage.zsql

Modified: erp5/trunk/products/CMFActivity/Activity/Queue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/Queue.py?rev=17759&r1=17758&r2=17759&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/Queue.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/Queue.py Fri Nov 23 11:02:39 2007
@@ -117,7 +117,6 @@
     self.is_alive = {}
     self.is_awake = {}
     self.is_initialized = 0
-    self.max_processing_date = DateTime()
 
   def initialize(self, activity_tool):
     # This is the only moment when
@@ -206,7 +205,7 @@
     return message_list
 
   def getExecutableMessageList(self, activity_tool, message, message_dict,
-                               validation_text_dict):
+                               validation_text_dict, now_date=None):
     """Get messages which have no dependent message, and store them in the dictionary.
 
     If the passed message itself is executable, simply store only that message.
@@ -233,7 +232,8 @@
       if message_list:
         # The result is not empty, so this message is not executable.
         validation_text_dict[message.order_validation_text] = 0
-        now_date = DateTime()
+        if now_date is None:
+          now_date = DateTime()
         for activity, m in message_list:
           # Note that the messages may contain ones which are already assigned or not
           # executable yet.
@@ -242,7 +242,7 @@
             message_dict[message.uid] = None
             try:
               self.getExecutableMessageList(activity_tool, m, message_dict,
-                                             validation_text_dict)
+                                             validation_text_dict, now_date=now_date)
             finally:
               del message_dict[message.uid]
       else:

Added: erp5/trunk/products/CMFActivity/Activity/SQLBase.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLBase.py?rev=17759&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLBase.py (added)
+++ erp5/trunk/products/CMFActivity/Activity/SQLBase.py Fri Nov 23 11:02:39 2007
@@ -1,0 +1,43 @@
+##############################################################################
+#
+# Copyright (c) 2007 Nexedi SA and Contributors. All Rights Reserved.
+#                    Vincent Pelletier <vincent at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsability of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# garantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+#
+##############################################################################
+
+class SQLBase:
+  """
+    Define a set of common methods for SQL-based storage of activities.
+  """
+ 
+  def getNow(self, context):
+    """
+      Return the current value for SQL server's NOW().
+      Note that this value is not cached, and is not transactionnal on MySQL
+      side.
+    """
+    result = context.SQLBase_getNow()
+    assert len(result) == 1
+    assert len(result[0]) == 1
+    return result[0][0]

Modified: erp5/trunk/products/CMFActivity/Activity/SQLDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLDict.py?rev=17759&r1=17758&r2=17759&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLDict.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLDict.py Fri Nov 23 11:02:39 2007
@@ -26,7 +26,6 @@
 #
 ##############################################################################
 
-from DateTime import DateTime
 from Products.CMFActivity.ActivityTool import registerActivity
 from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY, \
         abortTransactionSynchronously
@@ -36,47 +35,32 @@
 from ZODB.POSException import ConflictError
 import sys
 from types import ClassType
+#from time import time
+from SQLBase import SQLBase
 
 try:
   from transaction import get as get_transaction
 except ImportError:
   pass
 
-from zLOG import LOG, TRACE, WARNING, ERROR, INFO
+from zLOG import LOG, TRACE, WARNING, ERROR, INFO, PANIC
 
 MAX_PRIORITY = 5
+# Stop validating more messages when this limit is reached
+MAX_VALIDATED_LIMIT = 1000 
+# Read this many messages to validate.
+READ_MESSAGE_LIMIT = 1000
+# Stop electing more messages for processing if more than this many objects 
+# are impacted by elected messages.
 MAX_GROUPED_OBJECTS = 500
 
-priority_weight = \
-  [1] * 64 + \
-  [2] * 20 + \
-  [3] * 10 + \
-  [4] * 5 + \
-  [5] * 1
-
-LAST_PROCESSING_NODE = 1
-
-class SQLDict(RAMDict):
+class SQLDict(RAMDict, SQLBase):
   """
     A simple OOBTree based queue. It should be compatible with transactions
     and provide sequentiality. Should not create conflict
     because use of OOBTree.
   """
   # Transaction commit methods
-  def prepareQueueMessage(self, activity_tool, m):
-    if m.is_registered:
-      activity_tool.SQLDict_writeMessage( path = '/'.join(m.object_path) ,
-                                          method_id = m.method_id,
-                                          priority = m.activity_kw.get('priority', 1),
-                                          broadcast = m.activity_kw.get('broadcast', 0),
-                                          message = self.dumpMessage(m),
-                                          date = m.activity_kw.get('at_date', DateTime()),
-                                          group_method_id = '\0'.join([m.activity_kw.get('group_method_id', ''),
-                                                                      m.activity_kw.get('group_id', '')]),
-                                          tag = m.activity_kw.get('tag', ''),
-                                          order_validation_text = self.getOrderValidationText(m))
-                                          # Also store uid of activity
-
   def prepareQueueMessageList(self, activity_tool, message_list):
     registered_message_list = []
     for message in message_list:
@@ -87,21 +71,18 @@
       path_list = ['/'.join(message.object_path) for message in registered_message_list]
       method_id_list = [message.method_id for message in registered_message_list]
       priority_list = [message.activity_kw.get('priority', 1) for message in registered_message_list]
-      broadcast_list = [message.activity_kw.get('broadcast', 0) for message in registered_message_list]
       dumped_message_list = [self.dumpMessage(message) for message in registered_message_list]
-      datetime = DateTime()
-      date_list = [message.activity_kw.get('at_date', datetime) for message in registered_message_list]
+      date_list = [message.activity_kw.get('at_date', None) for message in registered_message_list]
       group_method_id_list = ['\0'.join([message.activity_kw.get('group_method_id', ''), message.activity_kw.get('group_id', '')])
                               for message in registered_message_list]
       tag_list = [message.activity_kw.get('tag', '') for message in registered_message_list]
       order_validation_text_list = [self.getOrderValidationText(message) for message in registered_message_list]
       uid_list = activity_tool.getPortalObject().portal_ids.generateNewLengthIdList(id_group='portal_activity', 
-		           id_count=len(registered_message_list), store=0)
+                   id_count=len(registered_message_list), store=0)
       activity_tool.SQLDict_writeMessageList( uid_list = uid_list,
                                               path_list = path_list,
                                               method_id_list = method_id_list,
                                               priority_list = priority_list,
-                                              broadcast_list = broadcast_list,
                                               message_list = dumped_message_list,
                                               date_list = date_list,
                                               group_method_id_list = group_method_id_list,
@@ -164,195 +145,293 @@
       return 0
     return 1
 
+  def getReservedMessageList(self, activity_tool, date, processing_node, limit=None, **kw):
+    """
+      Get and reserve a list of messages.
+      limit
+        Maximum number of messages to fetch.
+        This number is not garanted to be reached, because of:
+         - not enough messages being pending execution
+         - race condition (other nodes reserving the same messages at the same
+           time)
+        This number is guaranted not to be exceeded.
+        If None (or not given) no limit apply.
+    """
+    result = activity_tool.SQLDict_selectReservedMessageList(processing_node=processing_node, limit=limit)
+    if len(result) == 0:
+      activity_tool.SQLDict_reserveMessageList(limit=limit, processing_node=processing_node, to_date=date, **kw)
+      result = activity_tool.SQLDict_selectReservedMessageList(processing_node=processing_node, limit=limit)
+    return result
+
+  def makeMessageListAvailable(self, activity_tool, uid_list):
+    """
+      Put messages back in processing_node=0 .
+    """
+    if len(uid_list):
+      activity_tool.SQLDict_makeMessageListAvailable(uid_list=uid_list)
+
+  def deleteDuplicatedLineList(self, activity_tool, date, processing_node, line):
+    """
+      Delete all messages matching given one except itself.
+      Operator  Value
+      !=        uid
+      <=        date
+      =         path, method_id, group_method_id, order_validation_text,
+                processing_node, tag
+    """
+    activity_tool.SQLDict_deleteDuplicatedMessageList(
+      processing_node=processing_node, uid=line.uid,
+      to_date=line.date, path=line.path, method_id=line.method_id,
+      group_method_id=line.group_method_id,
+      order_validation_text=line.order_validation_text,
+      tag=line.tag)
+
+  def getProcessableMessageList(self, activity_tool, processing_node):
+    """
+      Always true:
+        For each reserved message, delete redundant messages when it gets
+        reserved (definitely lost, but they are expandable since redundant).
+
+      - reserve a message
+      - set reserved message to processing=1 state
+      - if this message has a group_method_id:
+        - reserve a bunch of BUNDLE_MESSAGE_COUNT messages
+        - untill number of impacted objects goes over MAX_GROUPED_OBJECTS
+          - get one message from the reserved bunch (this messages will be
+            "needed")
+          - increase the number of impacted object
+        - set "needed" reserved messages to processing=1 state
+        - unreserve "unneeded" messages
+      - return still-reserved message list and a group_method_id
+
+      If any error happens in above described process, try to unreserve all
+      messages already reserved in that process.
+      If it fails, complain loudly that some messages might still be in an
+      unclean state.
+
+      Returned values:
+        3-tuple:
+          - list of 3-tuple:
+            - message uid
+            - message
+            - priority
+          - impacted object count
+          - group_method_id
+    """
+    def getReservedMessageList(**kw):
+      line_list = self.getReservedMessageList(activity_tool=activity_tool,
+                                              date=now_date,
+                                              processing_node=processing_node,
+                                              **kw)
+      if len(line_list):
+        LOG('SQLDict', TRACE, 'Reserved messages: %r' % ([x.uid for x in line_list]))
+      return line_list
+    def deleteDuplicatedLineList(line):
+      self.deleteDuplicatedLineList(activity_tool=activity_tool, date=now_date,
+                                 processing_node=processing_node, line=line)
+    def makeMessageListAvailable(uid_list):
+      self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
+    BUNDLE_MESSAGE_COUNT = 100 # Arbitrary number
+    now_date = self.getNow(activity_tool)
+    message_list = []
+    def append(line, message):
+      uid = line.uid
+      message_list.append((uid, message, line.priority))
+    count = 0
+    group_method_id = None
+    try:
+      result = getReservedMessageList(limit=1)
+      if len(result) > 0:
+        line = result[0]
+        m = self.loadMessage(line.message, uid=line.uid)
+        append(line, m)
+        group_method_id = line.group_method_id
+        # Delete all messages matching current one - except current one.
+        deleteDuplicatedLineList(line)
+        activity_tool.SQLDict_processMessage(uid=[line.uid])
+        if group_method_id not in (None, '', '\0'):
+          # Count the number of objects to prevent too many objects.
+          count += len(m.getObjectList(activity_tool))
+          if count < MAX_GROUPED_OBJECTS:
+            # Retrieve objects which have the same group method.
+            result = getReservedMessageList(limit=BUNDLE_MESSAGE_COUNT, group_method_id=group_method_id)
+            path_and_method_id_dict = {}
+            unreserve_uid_list = []
+            for line in result:
+              # All fetched lines have the same group_method_id and
+              # processing_node.
+              # Their dates are lower-than or equal-to now_date.
+              # We read each line once so lines have distinct uids.
+              # So what remains to be filtered on are path, method_id,
+              # order_validation_text, tag
+              key = (line.path, line.method_id, line.order_validation_text, line.tag)
+              if key in path_and_method_id_dict:
+                LOG('SQLDict', TRACE, 'Duplicate of message %r has been skipped (it should already be deleted anyway): %r' % (path_and_method_id_dict[key], line.uid))
+                continue
+              path_and_method_id_dict[key] = line.uid
+              deleteDuplicatedLineList(line)
+              if count < MAX_GROUPED_OBJECTS:
+                m = self.loadMessage(line.message, uid=line.uid)
+                count += len(m.getObjectList(activity_tool))
+                append(line, m)
+              else:
+                unreserve_uid_list.append(line.uid)
+            activity_tool.SQLDict_processMessage(uid=[x[0] for x in message_list])
+            # Unreserve extra messages as soon as possible.
+            makeMessageListAvailable(unreserve_uid_list)
+      return message_list, count, group_method_id
+    except:
+      LOG('SQLDict', WARNING, 'Exception while reserving messages.', error=sys.exc_info())
+      if len(message_list):
+        to_free_uid_list = [x[0] for x in message_list]
+        try:
+          makeMessageListAvailable(to_free_uid_list)
+        except:
+          LOG('SQLDict', PANIC, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
+        else:
+          if len(to_free_uid_list):
+            LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
+      else:
+        LOG('SQLDict', TRACE, '(no message was reserved)')
+      return [], 0, None
+
+  def finalizeMessageExecution(self, activity_tool, message_uid_priority_list):
+    def makeMessageListAvailable(uid_list):
+      self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
+    deletable_uid_list = []
+    delay_uid_list = []
+    final_error_uid_list = []
+    message_with_active_process_list = []
+    for uid, m, priority in message_uid_priority_list:
+      if m.is_executed:
+        deletable_uid_list.append(uid)
+        if m.active_process:
+          message_with_active_process_list.append(m)
+      else:
+        if type(m.exc_type) is ClassType and \
+           issubclass(m.exc_type, ConflictError):
+          delay_uid_list.append(uid)
+        elif priority > MAX_PRIORITY:
+          final_error_uid_list.append(uid)
+        else:
+          try:
+            # Immediately update, because values different for every message
+            activity_tool.SQLDict_setPriority(
+              uid=[uid],
+              delay=VALIDATION_ERROR_DELAY,
+              priority=priority + 1)
+          except:
+            LOG('SQLDict', WARNING, 'Failed to increase priority of %r' % (uid, ), error=sys.exc_info())
+          try:
+            makeMessageListAvailable(delay_uid_list)
+          except:
+            LOG('SQLDict', PANIC, 'Failed to unreserve %r' % (uid, ), error=sys.exc_info())
+          else:
+            LOG('SQLDict', TRACE, 'Freed message %r' % (uid, ))
+    if len(deletable_uid_list):
+      try:
+        activity_tool.SQLDict_delMessage(uid=deletable_uid_list)
+      except:
+        LOG('SQLDict', PANIC, 'Failed to delete messages %r' % (deletable_uid_list, ), error=sys.exc_info())
+      else:
+        LOG('SQLDict', TRACE, 'Deleted messages %r' % (deletable_uid_list, ))
+    if len(delay_uid_list):
+      try:
+        # If this is a conflict error, do not lower the priority but only delay.
+        activity_tool.SQLDict_setPriority(uid=delay_uid_list, delay=VALIDATION_ERROR_DELAY)
+      except:
+        LOG('SQLDict', TRACE, 'Failed to delay %r' % (delay_uid_list, ), error=sys.exc_info())
+      try:
+        makeMessageListAvailable(delay_uid_list)
+      except:
+        LOG('SQLDict', PANIC, 'Failed to unreserve %r' % (delay_uid_list, ), error=sys.exc_info())
+      else:
+        LOG('SQLDict', TRACE, 'Freed messages %r' % (delay_uid_list, ))
+    if len(final_error_uid_list):
+      try:
+        activity_tool.SQLDict_assignMessage(uid=final_error_uid_list,
+                                            processing_node=INVOKE_ERROR_STATE)
+      except:
+        LOG('SQLDict', WARNING, 'Failed to set message to error state for %r' % (final_error_uid_list, ), error=sys.exc_info())
+    for m in message_with_active_process_list:
+      active_process = activity_tool.unrestrictedTraverse(m.active_process)
+      if not active_process.hasActivity():
+        # No more activity
+        m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
+
   # Queue semantic
   def dequeueMessage(self, activity_tool, processing_node):
-    readMessage = getattr(activity_tool, 'SQLDict_readMessage', None)
-    if readMessage is None:
-      return 1
-
-    now_date = DateTime()
-    result = readMessage(processing_node=processing_node, to_date=now_date)
-    if len(result) > 0:
-      line = result[0]
-      path = line.path
-      method_id = line.method_id
-      group_method_id = line.group_method_id
-      order_validation_text = line.order_validation_text
-      uid_list = activity_tool.SQLDict_readUidList(path=path, method_id=method_id,
-                                                   processing_node=None, to_date=now_date,
-                                                   order_validation_text=order_validation_text,
-                                                   group_method_id=group_method_id)
-      uid_list = [x.uid for x in uid_list]
-      uid_list_list = [uid_list]
-      priority_list = [line.priority]
-      # Make sure message can not be processed anylonger
-      if len(uid_list) > 0:
-        # Set selected messages to processing
-        activity_tool.SQLDict_processMessage(uid=uid_list,
-                                             processing_node=processing_node)
-      get_transaction().commit() # Release locks before starting a potentially long calculation
-      # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state
-
-      # At this point, messages are marked as processed. So catch any kind of exception to make sure
-      # that they are unmarked on error.
+    def makeMessageListAvailable(uid_list):
+      self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
+    message_uid_priority_list, count, group_method_id = \
+      self.getProcessableMessageList(activity_tool, processing_node)
+    if len(message_uid_priority_list):
+      # Remove group_id parameter from group_method_id
+      if group_method_id is not None:
+        group_method_id = group_method_id.split('\0')[0]
+      message_list = [x[1] for x in message_uid_priority_list]
+      if group_method_id not in (None, ""):
+        method  = activity_tool.invokeGroup
+        args = (group_method_id, message_list)
+      else:
+        method = activity_tool.invoke
+        args = (message_list[0], )
       try:
-        m = self.loadMessage(line.message, uid=line.uid)
-        message_list = [m]
-        # Validate message (make sure object exists, priority OK, etc.)
-        if not self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
-          return 0
-        
-        if group_method_id not in (None, '', '\0'):
-          # Count the number of objects to prevent too many objects.
-          if m.hasExpandMethod():
-            count = len(m.getObjectList(activity_tool))
-          else:
-            count = 1
-          
-          if count < MAX_GROUPED_OBJECTS:
-            # Retrieve objects which have the same group method.
-            result = readMessage(processing_node=processing_node,
-                                 to_date=now_date, group_method_id=group_method_id,
-                                 order_validation_text=order_validation_text)
-            #LOG('SQLDict dequeueMessage', 0, 'result = %d, group_method_id %s' % (len(result), group_method_id))
-            path_and_method_id_dict = {}
-            for line in result:
-              path = line.path
-              method_id = line.method_id
-
-              # Prevent using the same pair of a path and a method id.
-              key = (path, method_id)
-              if key in path_and_method_id_dict:
-                continue
-              path_and_method_id_dict[key] = 1
-
-              uid_list = activity_tool.SQLDict_readUidList(path=path, method_id=method_id,
-                                                           processing_node=None,
-                                                           to_date=now_date, group_method_id=group_method_id,
-                                                           order_validation_text=order_validation_text)
-              uid_list = [x.uid for x in uid_list]
-              if len(uid_list) > 0:
-                # Set selected messages to processing
-                activity_tool.SQLDict_processMessage(uid=uid_list,
-                                                     processing_node=processing_node)
-              get_transaction().commit() # Release locks before starting a potentially long calculation
-
-              # Save this newly marked uids as soon as possible.
-              uid_list_list.append(uid_list)
-
-              m = self.loadMessage(line.message, uid=line.uid)
-              if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
-                if m.hasExpandMethod():
-                  count += len(m.getObjectList(activity_tool))
-                else:
-                  count += 1
-                message_list.append(m)
-                priority_list.append(line.priority)
-                if count >= MAX_GROUPED_OBJECTS:
-                  break
-              else:
-                # If the uids were not valid, remove them from the list, as validateMessage
-                # unmarked them.
-                uid_list_list.pop()
-
-          # Release locks before starting a potentially long calculation
-          get_transaction().commit()
-
-        # Remove group_id parameter from group_method_id
-        if group_method_id is not None:
-          group_method_id = group_method_id.split('\0')[0]
+        # Commit right before executing messages.
+        # As MySQL transaction do no start exactly at the same time as ZODB
+        # transactions but a bit later, messages available might be called
+        # on objects which are not available - or available in an old
+        # version - to ZODB connector.
+        # So all connectors must be commited now that we have selected
+        # everything needed from MySQL to get a fresh view of ZODB objects.
+        get_transaction().commit() 
         # Try to invoke
-        if group_method_id not in (None, ""):
-          LOG('SQLDict', INFO,
-              'invoking a group method %s with %d objects '\
-              ' (%d objects in expanded form)' % ( 
-            group_method_id, len(message_list), count))
-          activity_tool.invokeGroup(group_method_id, message_list)
+        method(*args)
+      except:
+        LOG('SQLDict', WARNING, 'Exception raised when invoking messages (uid, path, method_id) %r' % ([(x[0], x[1].object_path, x[1].method_id) for x in message_uid_priority_list], ), error=sys.exc_info())
+        to_free_uid_list = [x[0] for x in message_uid_priority_list]
+        try:
+          makeMessageListAvailable(to_free_uid_list)
+        except:
+          LOG('SQLDict', PANIC, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
         else:
-          activity_tool.invoke(message_list[0])
-
-        # Check if messages are executed successfully.
-        # When some of them are executed successfully, it may not be acceptable to
-        # abort the transaction, because these remain pending, only due to other
-        # invalid messages. This means that a group method should not be used if
-        # it has a side effect. For now, only indexing uses a group method, and this
-        # has no side effect.
-        for m in message_list:
-          if m.is_executed:
-            get_transaction().commit()
-            break
-        else:
-          abortTransactionSynchronously()
-      except:
-        LOG('SQLDict', INFO, 
-            'an exception happened during processing %r' % (uid_list_list,),
-            error=sys.exc_info())
-        # If an exception occurs, abort the transaction to minimize the impact,
+          LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list))
         try:
           abortTransactionSynchronously()
         except:
           # Unfortunately, database adapters may raise an exception against abort.
-          LOG('SQLDict', WARNING,
+          LOG('SQLDict', PANIC,
               'abort failed, thus some objects may be modified accidentally')
-          pass
-
-        # An exception happens at somewhere else but invoke or invokeGroup, so messages
-        # themselves should not be delayed.
+          return True # Stop processing messages for this tic call for this queue.
+      # Only abort if nothing succeeded.
+      # This means that when processing multiple messages, failed ones must not cause
+      # bad things to happen if transaction is commited.
+      if len([x for x in message_uid_priority_list if x[1].is_executed]) == 0:
+        endTransaction = abortTransactionSynchronously
+      else:
+        endTransaction = get_transaction().commit
+      try:
+        endTransaction()
+      except:
+        LOG('SQLDict', WARNING, 'Failed to end transaction for messages (uid, path, method_id) %r' % ([(x[0], x[1].object_path, x[1].method_id) for x in message_uid_priority_list], ), error=sys.exc_info())
+        failed_message_uid_list = [x[0] for x in message_uid_priority_list]
         try:
-          for uid_list in uid_list_list:
-            if len(uid_list):
-              # This only sets processing to zero.
-              activity_tool.SQLDict_setPriority(uid=uid_list)
-              get_transaction().commit()
+          makeMessageListAvailable(failed_message_uid_list)
         except:
-          LOG('SQLDict', ERROR,
-              'SQLDict.dequeueMessage raised, and cannot even set processing to zero due to an exception',
-              error=sys.exc_info())
-          raise
-        return 0
-      
-      try:
-        for i in xrange(len(message_list)):
-          m = message_list[i]
-          uid_list = uid_list_list[i]
-          priority = priority_list[i]
-          if m.is_executed:
-            if len(uid_list) > 0:
-              activity_tool.SQLDict_delMessage(uid=uid_list)       # Delete it
-            get_transaction().commit()                             # If successful, commit
-            if m.active_process:
-              active_process = activity_tool.unrestrictedTraverse(m.active_process)
-              if not active_process.hasActivity():
-                # No more activity
-                m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
-          else:
-            if type(m.exc_type) is ClassType and issubclass(m.exc_type, ConflictError):
-              # If this is a conflict error, do not lower the priority but only delay.
-              activity_tool.SQLDict_setPriority(uid=uid_list, delay=VALIDATION_ERROR_DELAY)
-              get_transaction().commit() # Release locks before starting a potentially long calculation
-            elif priority > MAX_PRIORITY:
-              # This is an error
-              if len(uid_list) > 0:
-                activity_tool.SQLDict_assignMessage(uid=uid_list,
-                                                    processing_node=INVOKE_ERROR_STATE)
-                                                                                # Assign message back to 'error' state
-              m.notifyUser(activity_tool)                                       # Notify Error
-              get_transaction().commit()                                        # and commit
-            else:
-              # Lower priority
-              if len(uid_list) > 0:
-                activity_tool.SQLDict_setPriority(uid=uid_list, delay=VALIDATION_ERROR_DELAY,
-                                                  priority=priority + 1)
-              get_transaction().commit() # Release locks before starting a potentially long calculation
-      except:
-        LOG('SQLDict', ERROR,
-            'SQLDict.dequeueMessage raised an exception during checking for the results of processed messages',
-            error=sys.exc_info())
-        raise
-
-      return 0
-    get_transaction().commit() # Release locks before starting a potentially long calculation
-    return 1
+          LOG('SQQueue', PANIC, 'Failed to free remaining messages: %r' % (failed_message_uid_list, ), error=sys.exc_info())
+        else:
+          LOG('SQQueue', TRACE, 'Freed messages %r' % (failed_message_uid_list, ))
+        if endTransaction == abortTransactionSynchronously:
+          LOG('SQLDict', PANIC, 'Failed to abort executed messages. Some objects may be modified accidentally.')
+        else:
+          try:
+            abortTransactionSynchronously()
+          except:
+            LOG('SQLDict', PANIC, 'Failed to abort executed messages which also failed to commit. Some objects may be modified accidentally.')
+        return True # Stop processing messages for this tic call for this queue.
+      self.finalizeMessageExecution(activity_tool, message_uid_priority_list)
+    get_transaction().commit()
+    return not len(message_uid_priority_list)
 
   def hasActivity(self, activity_tool, object, **kw):
     hasMessage = getattr(activity_tool, 'SQLDict_hasMessage', None)
@@ -470,76 +549,34 @@
     return message_list
 
   def distribute(self, activity_tool, node_count):
+    offset = 0
     readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
     if readMessageList is not None:
-      global LAST_PROCESSING_NODE
-      now_date = DateTime()
+      now_date = self.getNow(activity_tool)
       result = readMessageList(path=None, method_id=None, processing_node=-1,
-                               to_date=now_date, include_processing=0)
-      get_transaction().commit()
-
-      validation_text_dict = {'none': 1}
-      message_dict = {}
-      for line in result:
-        message = self.loadMessage(line.message, uid = line.uid,
-                                   order_validation_text = line.order_validation_text)
-        self.getExecutableMessageList(activity_tool, message, message_dict,
-                                      validation_text_dict)
-      # XXX probably this below can be optimized by assigning multiple messages at a time.
-      path_dict = {}
-      assignMessage = activity_tool.SQLDict_assignMessage
-      processing_node = LAST_PROCESSING_NODE
-      id_tool = activity_tool.getPortalObject().portal_ids
-  
-      for message in message_dict.itervalues():
-        path = '/'.join(message.object_path)
-        broadcast = message.activity_kw.get('broadcast', 0)
-        if broadcast:
-          # Broadcast messages must be distributed into all nodes.
-          uid = message.uid
-          assignMessage(processing_node=1, uid=[uid])
-          if node_count > 1:
-            uid_list = id_tool.generateNewLengthIdList(id_group='portal_activity',
-                                                       id_count=node_count - 1,
-						       store=0)
-            path_list = [path] * (node_count - 1)
-            method_id_list = [message.method_id] * (node_count - 1)
-            priority_list = [message.activity_kw.get('priority', 1)] * (node_count - 1)
-            processing_node_list = range(2, node_count + 1)
-            broadcast_list = [1] * (node_count - 1)
-            message_list = [self.dumpMessage(message)] * (node_count - 1)
-            date_list = [message.activity_kw.get('at_date', now_date)] * (node_count - 1)
-            group_method_id_list = ['\0'.join([message.activity_kw.get('group_method_id', ''),
-                                              message.activity_kw.get('group_id', '')])] * (node_count - 1)
-            tag_list = [message.activity_kw.get('tag', '')] * (node_count - 1)
-            order_validation_text_list = [message.order_validation_text] * (node_count - 1)
-            activity_tool.SQLDict_writeMessageList(uid_list=uid_list,
-                                                   path_list=path_list,
-                                                   method_id_list=method_id_list,
-                                                   priority_list=priority_list,
-                                                   broadcast_list=broadcast_list,
-                                                   processing_node_list=processing_node_list,
-                                                   message_list=message_list,
-                                                   date_list=date_list,
-                                                   group_method_id_list=group_method_id_list,
-                                                   tag_list=tag_list,
-                                                   order_validation_text_list=order_validation_text_list)
-          get_transaction().commit()
-        else:
-          # Select a processing node. If the same path appears again, dispatch the message to
-          # the same node, so that object caching is more efficient. Otherwise, apply a round
-          # robin scheduling.
-          node = path_dict.get(path)
-          if node is None:
-            node = processing_node
-            path_dict[path] = node
-            processing_node += 1
-            if processing_node > node_count:
-              processing_node = 1
-
-          assignMessage(processing_node=node, uid=[message.uid], broadcast=0)
-          get_transaction().commit() # Release locks immediately to allow processing of messages
-      LAST_PROCESSING_NODE = processing_node
+                               to_date=now_date, include_processing=0, offset=offset, count=READ_MESSAGE_LIMIT)
+      validated_count = 0
+      #TIME_begin = time()
+      while len(result) and validated_count < MAX_VALIDATED_LIMIT:
+        get_transaction().commit()
+
+        validation_text_dict = {'none': 1}
+        message_dict = {}
+        for line in result:
+          message = self.loadMessage(line.message, uid = line.uid,
+                                     order_validation_text = line.order_validation_text)
+          self.getExecutableMessageList(activity_tool, message, message_dict,
+                                        validation_text_dict, now_date=now_date)
+        distributable_count = len(message_dict)
+        if distributable_count:
+          activity_tool.SQLDict_assignMessage(processing_node=0, uid=[message.uid for message in message_dict.itervalues()])
+          validated_count += distributable_count
+        if validated_count < MAX_VALIDATED_LIMIT:
+          offset += READ_MESSAGE_LIMIT
+          result = readMessageList(path=None, method_id=None, processing_node=-1,
+                                   to_date=now_date, include_processing=0, offset=offset, count=READ_MESSAGE_LIMIT)
+      #TIME_end = time()
+      #LOG('SQLDict.distribute', INFO, '%0.4fs : %i messages => %i distributables' % (TIME_end - TIME_begin, offset - READ_MESSAGE_LIMIT + len(result), validated_count))
 
   # Validation private methods
   def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None):

Modified: erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLQueue.py?rev=17759&r1=17758&r2=17759&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLQueue.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLQueue.py Fri Nov 23 11:02:39 2007
@@ -28,7 +28,6 @@
 
 from Products.CMFActivity.ActivityTool import registerActivity
 from RAMQueue import RAMQueue
-from DateTime import DateTime
 from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY, \
         abortTransactionSynchronously
 from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
@@ -37,26 +36,25 @@
 from types import ClassType
 import sys
 from time import time
+from sets import ImmutableSet
+from SQLBase import SQLBase
 
 try:
   from transaction import get as get_transaction
 except ImportError:
   pass
 
-from zLOG import LOG, WARNING, ERROR
+from zLOG import LOG, WARNING, ERROR, INFO, PANIC, TRACE
 
 MAX_PRIORITY = 5
-
-priority_weight = \
-  [1] * 64 + \
-  [2] * 20 + \
-  [3] * 10 + \
-  [4] * 5 + \
-  [5] * 1
-
-LAST_PROCESSING_NODE = 1
-
-class SQLQueue(RAMQueue):
+# Stop validating more messages when this limit is reached
+MAX_VALIDATED_LIMIT = 1000
+# Read this many messages to validate.
+READ_MESSAGE_LIMIT = 1000
+# Process this many messages in each dequeueMessage call.
+MESSAGE_BUNDLE_SIZE = 10
+
+class SQLQueue(RAMQueue, SQLBase):
   """
     A simple OOBTree based queue. It should be compatible with transactions
     and provide sequentiality. Should not create conflict
@@ -66,121 +64,240 @@
     if m.is_registered:
       id_tool = activity_tool.getPortalObject().portal_ids
       uid = id_tool.generateNewLengthId(id_group='portal_activity_queue', store=0)
-      activity_tool.SQLQueue_writeMessage(uid = uid,
-                                          path = '/'.join(m.object_path) ,
-                                          method_id = m.method_id,
-                                          priority = m.activity_kw.get('priority', 1),
-                                          broadcast = m.activity_kw.get('broadcast', 0),
-                                          message = self.dumpMessage(m),
-                                          date = m.activity_kw.get('at_date', DateTime()),
-                                          tag = m.activity_kw.get('tag', ''))
+      path = '/'.join(m.object_path)
+      method_id = m.method_id
+      priority = m.activity_kw.get('priority', 1)
+      date = m.activity_kw.get('at_date', None)
+      if date is None:
+        date = self.getNow(activity_tool)
+      tag = m.activity_kw.get('tag', '')
+      activity_tool.SQLQueue_writeMessage(uid=uid,
+                                          path=path,
+                                          method_id=method_id,
+                                          priority=priority,
+                                          message=self.dumpMessage(m),
+                                          date=date,
+                                          tag=tag)
 
   def prepareDeleteMessage(self, activity_tool, m):
     # Erase all messages in a single transaction
     #LOG("prepareDeleteMessage", 0, str(m.__dict__))
     activity_tool.SQLQueue_delMessage(uid = [m.uid])
 
+  def getReservedMessageList(self, activity_tool, date, processing_node, limit=None):
+    """
+      Get and reserve a list of messages.
+      limit
+        Maximum number of messages to fetch.
+        This number is not garanted to be reached, because of:
+         - not enough messages being pending execution
+         - race condition (other nodes reserving the same messages at the same
+           time)
+        This number is guaranted not to be exceeded.
+        If None (or not given) no limit apply.
+    """
+    result = activity_tool.SQLQueue_selectReservedMessageList(processing_node=processing_node, limit=limit)
+    if len(result) == 0:
+      activity_tool.SQLQueue_reserveMessageList(limit=limit, processing_node=processing_node, to_date=date)
+      result = activity_tool.SQLQueue_selectReservedMessageList(processing_node=processing_node, limit=limit)
+    return result
+
+  def makeMessageListAvailable(self, activity_tool, uid_list):
+    """
+      Put messages back in processing_node=0 .
+    """
+    if len(uid_list):
+      activity_tool.SQLQueue_makeMessageListAvailable(uid_list=uid_list)
+
+  def getProcessableMessageList(self, activity_tool, processing_node):
+    """
+      Always true:
+        For each reserved message, delete redundant messages when it gets
+        reserved (definitely lost, but they are expandable since redundant).
+
+      - reserve a message
+      - set reserved message to processing=1 state
+      - if this message has a group_method_id:
+        - reserve a bunch of BUNDLE_MESSAGE_COUNT messages
+        - untill number of impacted objects goes over MAX_GROUPED_OBJECTS
+          - get one message from the reserved bunch (this messages will be
+            "needed")
+          - increase the number of impacted object
+        - set "needed" reserved messages to processing=1 state
+        - unreserve "unneeded" messages
+      - return still-reserved message list
+
+      If any error happens in above described process, try to unreserve all
+      messages already reserved in that process.
+      If it fails, complain loudly that some messages might still be in an
+      unclean state.
+
+      Returned values:
+        list of 3-tuple:
+          - message uid
+          - message
+          - priority
+    """
+    def getReservedMessageList(limit):
+      line_list = self.getReservedMessageList(activity_tool=activity_tool,
+                                              date=now_date,
+                                              processing_node=processing_node,
+                                              limit=limit)
+      if len(line_list):
+        LOG('SQLQueue', TRACE, 'Reserved messages: %r' % ([x.uid for x in line_list]))
+      return line_list
+    def makeMessageListAvailable(uid_list):
+      self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
+    now_date = self.getNow(activity_tool)
+    message_list = []
+    def append(line, message):
+      uid = line.uid
+      message_list.append((uid, message, line.priority))
+    try:
+      result = getReservedMessageList(limit=MESSAGE_BUNDLE_SIZE)
+      for line in result:
+        m = self.loadMessage(line.message, uid=line.uid)
+        append(line, m)
+      if len(message_list):
+        activity_tool.SQLQueue_processMessage(uid=[x[0] for x in message_list])
+      return message_list
+    except:
+      LOG('SQLQueue', WARNING, 'Exception while reserving messages.', error=sys.exc_info())
+      if len(message_list):
+        to_free_uid_list = [x[0] for x in message_list]
+        try:
+          makeMessageListAvailable(to_free_uid_list)
+        except:
+          LOG('SQLQueue', PANIC, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
+        else:
+          if len(to_free_uid_list):
+            LOG('SQLQueue', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
+      else:
+        LOG('SQLQueue', TRACE, '(no message was reserved)')
+      return []
+
+  def finalizeMessageExecution(self, activity_tool, message_uid_priority_list):
+    def makeMessageListAvailable(uid_list):
+      self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
+    deletable_uid_list = []
+    delay_uid_list = []
+    final_error_uid_list = []
+    message_with_active_process_list = []
+    for uid, m, priority in message_uid_priority_list:
+      if m.is_executed:
+        deletable_uid_list.append(uid)
+        if m.active_process:
+          message_with_active_process_list.append(m)
+      else:
+        if type(m.exc_type) is ClassType and \
+           issubclass(m.exc_type, ConflictError):
+          delay_uid_list.append(uid)
+        elif priority > MAX_PRIORITY:
+          final_error_uid_list.append(uid)
+        else:
+          try:
+            # Immediately update, because values different for every message
+            activity_tool.SQLQueue_setPriority(
+              uid=[uid],
+              delay=VALIDATION_ERROR_DELAY,
+              priority=priority + 1)
+          except:
+            LOG('SQLQueue', WARNING, 'Failed to increase priority of %r' % (uid, ), error=sys.exc_info())
+          try:
+            makeMessageListAvailable(delay_uid_list)
+          except:
+            LOG('SQLQueue', PANIC, 'Failed to unreserve %r' % (uid, ), error=sys.exc_info())
+          else:
+            LOG('SQLQueue', TRACE, 'Freed message %r' % (uid, ))
+    if len(deletable_uid_list):
+      try:
+        activity_tool.SQLQueue_delMessage(uid=deletable_uid_list)
+      except:
+        LOG('SQLQueue', PANIC, 'Failed to delete messages %r' % (deletable_uid_list, ), error=sys.exc_info())
+      else:
+        LOG('SQLQueue', TRACE, 'Deleted messages %r' % (deletable_uid_list, ))
+    if len(delay_uid_list):
+      try:
+        # If this is a conflict error, do not lower the priority but only delay.
+        activity_tool.SQLQueue_setPriority(uid=delay_uid_list, delay=VALIDATION_ERROR_DELAY)
+      except:
+        LOG('SQLQueue', TRACE, 'Failed to delay %r' % (delay_uid_list, ), error=sys.exc_info())
+      try:
+        makeMessageListAvailable(delay_uid_list)
+      except:
+        LOG('SQLQueue', PANIC, 'Failed to unreserve %r' % (delay_uid_list, ), error=sys.exc_info())
+      else:
+        LOG('SQLQueue', TRACE, 'Freed messages %r' % (delay_uid_list, ))
+    if len(final_error_uid_list):
+      try:
+        activity_tool.SQLQueue_assignMessage(uid=final_error_uid_list,
+                                             processing_node=INVOKE_ERROR_STATE)
+      except:
+        LOG('SQLQueue', WARNING, 'Failed to set message to error state for %r' % (final_error_uid_list, ), error=sys.exc_info())
+    for m in message_with_active_process_list:
+      active_process = activity_tool.unrestrictedTraverse(m.active_process)
+      if not active_process.hasActivity():
+        # No more activity
+        m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
+
+
   def dequeueMessage(self, activity_tool, processing_node):
-    readMessage = getattr(activity_tool, 'SQLQueue_readMessage', None)
-    if readMessage is None:
-      return 1
-
-    # XXX: arbitrary maximum delay.
-    # Stop processing new messages if processing duration exceeds 10 seconds.
-    activity_stop_time = time() + 10
-    now_date = DateTime()
-    # Next processing date in case of error
-    next_processing_date = now_date + float(VALIDATION_ERROR_DELAY)/86400
-    message_list = readMessage(processing_node=processing_node, to_date=now_date)
-    for line in message_list:
-      if time() > activity_stop_time:
-        break
-      path = line.path
-      method_id = line.method_id
-      # Make sure message can not be processed anylonger
-      activity_tool.SQLQueue_processMessage(uid=line.uid)
-      get_transaction().commit() # Release locks before starting a potentially long calculation
-
-      # At this point, the message is marked as processed.
-      try:
-        m = self.loadMessage(line.message)
-        # Make sure object exists
-        validation_state = m.validate(self, activity_tool, check_order_validation=0)
-        if validation_state is not VALID:
-          if line.priority > MAX_PRIORITY:
-            # This is an error.
-            # Assign message back to 'error' state.
-            activity_tool.SQLQueue_assignMessage(uid=line.uid,
-                                                 processing_node=VALIDATE_ERROR_STATE)
-            get_transaction().commit()                                        # and commit
-          else:
-            # Lower priority
-            activity_tool.SQLQueue_setPriority(uid=line.uid, priority=line.priority + 1)
-            get_transaction().commit() # Release locks before starting a potentially long calculation
-          continue
-
+    def makeMessageListAvailable(uid_list):
+      self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
+    message_uid_priority_list = \
+      self.getProcessableMessageList(activity_tool, processing_node)
+    if len(message_uid_priority_list):
+      processing_stop_time = time() + 30 # Stop processing after more than 10 seconds were spent
+      processed_message_uid_list = []
+      # Commit right before executing messages.
+      # As MySQL transaction do no start exactly at the same time as ZODB
+      # transactions but a bit later, messages available might be called
+      # on objects which are not available - or available in an old
+      # version - to ZODB connector.
+      # So all connectors must be commited now that we have selected
+      # everything needed from MySQL to get a fresh view of ZODB objects.
+      get_transaction().commit()
+      for value in message_uid_priority_list:
+        processed_message_uid_list.append(value)
         # Try to invoke
-        activity_tool.invoke(m) # Try to invoke the message
-        if m.is_executed:                                          # Make sure message could be invoked
-          get_transaction().commit()                                        # If successful, commit
-      except:
-        # If an exception occurs, abort the transaction to minimize the impact,
-        LOG('SQLQueue', WARNING, 'Could not evaluate %s on %s' % (m.method_id, path),
-             error=sys.exc_info())
         try:
-          abortTransactionSynchronously()
-        except:
-          # Unfortunately, database adapters may raise an exception against abort.
-          LOG('SQLQueue', WARNING, 'abort failed, thus some objects may be modified accidentally')
-          pass
-
-        # An exception happens at somewhere else but invoke, so messages
-        # themselves should not be delayed.
-        try:
-          activity_tool.SQLQueue_setPriority(uid=line.uid, date=line.date,
-                                             priority=line.priority)
+          activity_tool.invoke(value[1])
+          # Commit so that if a message raises it doesn't causes previous
+          # successfull messages to be rolled back. This commit might fail,
+          # so it is protected the same way as activity execution by the
+          # same "try" block.
           get_transaction().commit()
         except:
-          LOG('SQLQueue', ERROR, 'SQLQueue.dequeueMessage raised, and cannot even set processing to zero due to an exception',
-              error=sys.exc_info())
-          raise
-        continue
-
-      try:
-        if m.is_executed:
-          activity_tool.SQLQueue_delMessage(uid=[line.uid])  # Delete it
-        else:
+          LOG('SQLQueue', WARNING, 'Exception raised when invoking message (uid, path, method_id) %r' % (value, ), error=sys.exc_info())
           try:
-            # If not, abort transaction and start a new one
+            makeMessageListAvailable([value[0]])
+          except:
+            LOG('SQQueue', PANIC, 'Failed to free message: %r' % (value, ), error=sys.exc_info())
+          else:
+            LOG('SQLQueue', TRACE, 'Freed message %r' % (value, ))
+          try:
             abortTransactionSynchronously()
           except:
             # Unfortunately, database adapters may raise an exception against abort.
-            LOG('SQLQueue', WARNING, 'abort failed, thus some objects may be modified accidentally')
-            pass
-
-          if type(m.exc_type) is ClassType \
-                  and issubclass(m.exc_type, ConflictError):
-            activity_tool.SQLQueue_setPriority(uid=line.uid,
-                                               date=next_processing_date,
-                                               priority=line.priority)
-          elif line.priority > MAX_PRIORITY:
-            # This is an error
-            activity_tool.SQLQueue_assignMessage(uid=line.uid,
-                                                 processing_node=INVOKE_ERROR_STATE)
-                                                                              # Assign message back to 'error' state
-            m.notifyUser(activity_tool)                                       # Notify Error
-          else:
-            # Lower priority
-            activity_tool.SQLQueue_setPriority(uid=line.uid, date=next_processing_date,
-                                               priority=line.priority + 1)
-        get_transaction().commit()
-      except:
-        LOG('SQLQueue', ERROR,
-            'SQLQueue.dequeueMessage raised an exception during checking for the results of processed messages',
-            error=sys.exc_info())
-        raise
-    get_transaction().commit() # Release locks before starting a potentially long calculation
-    return len(message_list) == 0
+            LOG('SQLQueue', PANIC, 'abort failed, thus some objects may be modified accidentally')
+            return True # Stop processing messages for this tic call for this queue.
+        if time() > processing_stop_time:
+          LOG('SQLQueue', TRACE, 'Stop processing message batch because processing delay exceeded')
+          break
+      # Release all unprocessed messages
+      processed_uid_set = ImmutableSet([x[0] for x in processed_message_uid_list])
+      to_free_uid_list = [x[0] for x in message_uid_priority_list if x[0] not in processed_uid_set]
+      if len(to_free_uid_list):
+        try:
+          makeMessageListAvailable(to_free_uid_list)
+        except:
+          LOG('SQQueue', PANIC, 'Failed to free remaining messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
+        else:
+          LOG('SQQueue', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
+      self.finalizeMessageExecution(activity_tool, processed_message_uid_list)
+    get_transaction().commit()
+    return not len(message_uid_priority_list)
+
 
   def hasActivity(self, activity_tool, object, **kw):
     hasMessage = getattr(activity_tool, 'SQLQueue_hasMessage', None)
@@ -310,69 +427,36 @@
     return message_list
 
   def distribute(self, activity_tool, node_count):
+    offset = 0
     readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None)
     if readMessageList is not None:
-      global LAST_PROCESSING_NODE
-      now_date = DateTime()
-      result = readMessageList(path=None, method_id=None,
-                               processing_node=-1, to_date=now_date)
-      get_transaction().commit()
-
-      validation_text_dict = {'none': 1}
-      message_dict = {}
-      for line in result:
-        message = self.loadMessage(line.message, uid=line.uid)
-        message.order_validation_text = self.getOrderValidationText(message)
-        self.getExecutableMessageList(activity_tool, message, message_dict,
-                                      validation_text_dict)
-
-      # XXX probably this below can be optimized by assigning multiple messages at a time.
-      path_dict = {}
-      assignMessage = activity_tool.SQLQueue_assignMessage
-      processing_node = LAST_PROCESSING_NODE
-      id_tool = activity_tool.getPortalObject().portal_ids
-      for message in message_dict.itervalues():
-        path = '/'.join(message.object_path)
-        broadcast = message.activity_kw.get('broadcast', 0)
-        if broadcast:
-          # Broadcast messages must be distributed into all nodes.
-          assignMessage(processing_node=1, uid=message.uid)
-          if node_count > 1:
-            uid_list = id_tool.generateNewLengthIdList(id_group='portal_activity_queue',
-                                                       id_count=node_count - 1,
-						       store=0)
-            priority = message.activity_kw.get('priority', 1)
-            dumped_message = self.dumpMessage(message)
-            date = message.activity_kw.get('at_date', now_date)
-            tag = message.activity_kw.get('tag', '')
-            for node in xrange(2, node_count+1):
-              activity_tool.SQLQueue_writeMessage(uid=uid_list.pop(),
-                                                  path=path,
-                                                  method_id=message.method_id,
-                                                  priority=priority,
-                                                  broadcast=1,
-                                                  processing_node=node,
-                                                  message=dumped_message,
-                                                  date=date,
-                                                  tag=tag)
-          get_transaction().commit()
-        else:
-          # Select a processing node. If the same path appears again, dispatch the message to
-          # the same node, so that object caching is more efficient. Otherwise, apply a round
-          # robin scheduling.
-          node = path_dict.get(path)
-          round_robin_scheduling = message.activity_kw.get('round_robin_scheduling', 0)
-          if node is None:
-            node = processing_node
-	    if not round_robin_scheduling:
-              path_dict[path] = node
-            processing_node += 1
-            if processing_node > node_count:
-              processing_node = 1
-
-          assignMessage(processing_node=node, uid=message.uid, broadcast=0)
-          get_transaction().commit() # Release locks immediately to allow processing of messages
-      LAST_PROCESSING_NODE = processing_node
+      now_date = self.getNow(activity_tool)
+      result = readMessageList(path=None, method_id=None, processing_node=-1,
+                               to_date=now_date, include_processing=0, 
+                               offset=offset, limit=READ_MESSAGE_LIMIT)
+      validated_count = 0
+      #TIME_begin = time()
+      while len(result) and validated_count < MAX_VALIDATED_LIMIT:
+        get_transaction().commit()
+
+        validation_text_dict = {'none': 1}
+        message_dict = {}
+        for line in result:
+          message = self.loadMessage(line.message, uid = line.uid)
+          message.order_validation_text = self.getOrderValidationText(message)
+          self.getExecutableMessageList(activity_tool, message, message_dict,
+                                        validation_text_dict, now_date=now_date)
+        distributable_count = len(message_dict)
+        if distributable_count:
+          activity_tool.SQLQueue_assignMessage(processing_node=0, uid=[message.uid for message in message_dict.itervalues()])
+          validated_count += distributable_count
+        if validated_count < MAX_VALIDATED_LIMIT:
+          offset += READ_MESSAGE_LIMIT
+          result = readMessageList(path=None, method_id=None, processing_node=-1,
+                                   to_date=now_date, include_processing=0, 
+                                   offset=offset, limit=READ_MESSAGE_LIMIT)
+      #TIME_end = time()
+      #LOG('SQLQueue.distribute', INFO, '%0.4fs : %i messages => %i distributables' % (TIME_end - TIME_begin, offset + len(result), validated_count))
 
   # Validation private methods
   def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None):

Modified: erp5/trunk/products/CMFActivity/ActivityTool.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/ActivityTool.py?rev=17759&r1=17758&r2=17759&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/ActivityTool.py (original)
+++ erp5/trunk/products/CMFActivity/ActivityTool.py Fri Nov 23 11:02:39 2007
@@ -130,14 +130,16 @@
 
   def getObjectList(self, activity_tool):
     """return the list of object that can be expanded from this message."""
+    object_list = []
     try:
-      expand_method_id = self.activity_kw['expand_method_id']
-      obj = self.getObject(activity_tool)
-      # FIXME: how to pass parameters?
-      object_list = getattr(obj, expand_method_id)()
+      object_list.append(self.getObject(activity_tool))
     except KeyError:
-      object_list = [self.getObject(activity_tool)]
-
+      pass
+    else:
+      if self.hasExpandMethod():
+        expand_method_id = self.activity_kw['expand_method_id']
+        # FIXME: how to pass parameters?
+        object_list = getattr(object_list[0], expand_method_id)()
     return object_list
 
   def hasExpandMethod(self):

Added: erp5/trunk/products/CMFActivity/skins/activity/SQLBase_getNow.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLBase_getNow.zsql?rev=17759&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLBase_getNow.zsql (added)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLBase_getNow.zsql Fri Nov 23 11:02:39 2007
@@ -1,0 +1,11 @@
+<dtml-comment>
+title:
+connection_id:cmf_activity_sql_connection
+max_rows:0
+max_cache:0
+cache_time:0
+class_name:
+class_file:
+</dtml-comment>
+<params></params>
+SELECT NOW()

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_assignMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_assignMessage.zsql?rev=17759&r1=17758&r2=17759&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_assignMessage.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_assignMessage.zsql Fri Nov 23 11:02:39 2007
@@ -11,7 +11,7 @@
 processing_node
 method_id
 uid
-broadcast</params>
+</params>
 UPDATE message
 SET
   processing_node=<dtml-sqlvar processing_node type="int">,
@@ -26,6 +26,5 @@
   AND path = <dtml-sqlvar path type="string">
 <dtml-if method_id>AND method_id = <dtml-sqlvar method_id type="string"></dtml-if>
 </dtml-if>
-<dtml-if broadcast>
-  AND broadcast = <dtml-sqlvar broadcast type="int">
-</dtml-if>
+<dtml-var sql_delimiter>
+COMMIT

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_clearProcessingFlag.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_clearProcessingFlag.zsql?rev=17759&r1=17758&r2=17759&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_clearProcessingFlag.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_clearProcessingFlag.zsql Fri Nov 23 11:02:39 2007
@@ -11,7 +11,7 @@
 UPDATE
   message
 SET
-  processing="0"
+  processing=0,
+  processing_node=0
 WHERE
-  processing="1"
-  AND processing_node="<dtml-sqlvar processing_node type="int">"
+  processing_node=<dtml-sqlvar processing_node type="int">

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql?rev=17759&r1=17758&r2=17759&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql Fri Nov 23 11:02:39 2007
@@ -17,7 +17,6 @@
   `processing` TINYINT NOT NULL DEFAULT 0,
   `processing_date` DATETIME,
   `priority` TINYINT NOT NULL DEFAULT 0,
-  `broadcast` TINYINT NOT NULL DEFAULT 0,
   `group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
   `tag` VARCHAR(255) NOT NULL,
   `retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
@@ -26,8 +25,7 @@
   PRIMARY KEY (`uid`),
   KEY (`path`),
   KEY (`method_id`),
-  KEY (`processing_node`),
-  KEY (`processing`),
+  KEY `processing_node_processing` (`processing_node`, `processing`),
   KEY (`priority`),
   KEY (`tag`),
   KEY (`order_validation_text`)

Added: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_deleteDuplicatedMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_deleteDuplicatedMessageList.zsql?rev=17759&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_deleteDuplicatedMessageList.zsql (added)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_deleteDuplicatedMessageList.zsql Fri Nov 23 11:02:39 2007
@@ -1,0 +1,32 @@
+<dtml-comment>
+title:
+connection_id:cmf_activity_sql_connection
+max_rows:0
+max_cache:0
+cache_time:0
+class_name:
+class_file:
+</dtml-comment>
+<params>
+processing_node
+uid
+to_date
+path
+method_id
+group_method_id
+order_validation_text
+tag
+</params>
+DELETE FROM
+  message
+WHERE
+  processing_node IN (0, <dtml-sqlvar processing_node type="int">)
+  AND uid != <dtml-sqlvar uid type="int">
+  AND date <= <dtml-sqlvar to_date type="datetime">
+  AND path = <dtml-sqlvar path type="string">
+  AND method_id = <dtml-sqlvar method_id type="string">
+  AND group_method_id = <dtml-sqlvar group_method_id type="string">
+  AND order_validation_text = <dtml-sqlvar order_validation_text type="string">
+  AND tag IN ('', <dtml-sqlvar tag type="string">)
+<dtml-var sql_delimiter>
+COMMIT

Added: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql?rev=17759&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql (added)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_makeMessageListAvailable.zsql Fri Nov 23 11:02:39 2007
@@ -1,0 +1,21 @@
+<dtml-comment>
+title:
+connection_id:cmf_activity_sql_connection
+max_rows:0
+max_cache:0
+cache_time:0
+class_name:
+class_file:
+</dtml-comment>
+<params>uid_list</params>
+UPDATE
+  message
+SET
+  processing_node=0,
+  processing=0
+WHERE
+  uid IN (
+  <dtml-in prefix="uid" expr="uid_list"><dtml-sqlvar uid_item type="int"><dtml-if sequence-end><dtml-else>, </dtml-if></dtml-in>
+  )
+<dtml-var sql_delimiter>
+COMMIT

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql?rev=17759&r1=17758&r2=17759&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql Fri Nov 23 11:02:39 2007
@@ -7,14 +7,14 @@
 class_name:
 class_file:
 </dtml-comment>
-<params>uid
-processing_node</params>
+<params>uid</params>
 UPDATE message
 SET
-  processing_date = <dtml-sqlvar "_.DateTime()" type="datetime">,
-  processing = 1,
-  processing_node = <dtml-sqlvar processing_node type="int">
+  processing_date = NOW(),
+  processing = 1
 WHERE
   uid IN (
 <dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
   )
+<dtml-var sql_delimiter>
+COMMIT

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readMessage.zsql?rev=17759&r1=17758&r2=17759&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readMessage.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readMessage.zsql Fri Nov 23 11:02:39 2007
@@ -1,31 +1,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:0
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>processing_node
-priority
-to_date
-to_processing_date
-group_method_id
-order_validation_text</params>
-SELECT * FROM
-    message
-WHERE
-    processing = 0
-<dtml-if processing_node> AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if>
-<dtml-if priority> AND priority = <dtml-sqlvar priority type="int"> </dtml-if>
-<dtml-if to_date>AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>
-<dtml-if group_method_id>AND group_method_id = <dtml-sqlvar group_method_id type="string"> </dtml-if>
-<dtml-if order_validation_text>AND order_validation_text = <dtml-sqlvar order_validation_text type="string"> </dtml-if>
-ORDER BY
-    priority, date, uid
-<dtml-if group_method_id>
-LIMIT 100
-<dtml-else>
-LIMIT 1
-</dtml-if>

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readMessageList.zsql?rev=17759&r1=17758&r2=17759&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readMessageList.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readMessageList.zsql Fri Nov 23 11:02:39 2007
@@ -1,7 +1,7 @@
 <dtml-comment>
 title:
 connection_id:cmf_activity_sql_connection
-max_rows:1000
+max_rows:0
 max_cache:0
 cache_time:0
 class_name:
@@ -12,7 +12,10 @@
 processing_node
 priority
 include_processing
-to_date</params>
+to_date
+offset:int=0
+count:int=1000
+</params>
 SELECT * FROM
     message
 WHERE
@@ -20,10 +23,11 @@
 <dtml-if expr="not(include_processing)">
     AND processing = 0
 </dtml-if>
-<dtml-if processing_node> AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if>
+<dtml-if expr="processing_node is not None"> AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if>
 <dtml-if priority> AND priority = <dtml-sqlvar priority type="int"> </dtml-if>
 <dtml-if path>AND path = <dtml-sqlvar path type="string"> </dtml-if>
 <dtml-if method_id> AND method_id = <dtml-sqlvar method_id type="string"> </dtml-if>
 <dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>
 ORDER BY
      priority, date, uid
+LIMIT <dtml-sqlvar offset type="int">, <dtml-sqlvar count type="int">

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readUidList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readUidList.zsql?rev=17759&r1=17758&r2=17759&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readUidList.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readUidList.zsql Fri Nov 23 11:02:39 2007
@@ -17,7 +17,7 @@
     message
 WHERE
     processing = 0
-<dtml-if processing_node> AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if>
+<dtml-if expr="processing_node is not None"> AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if>
 <dtml-if method_id> AND method_id = <dtml-sqlvar method_id type="string"> </dtml-if>
 <dtml-if path> AND path = <dtml-sqlvar path type="string"> </dtml-if>
 <dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>

Added: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql?rev=17759&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql (added)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveMessageList.zsql Fri Nov 23 11:02:39 2007
@@ -1,0 +1,30 @@
+<dtml-comment>
+title:
+connection_id:cmf_activity_sql_connection
+max_rows:0
+max_cache:0
+cache_time:0
+class_name:
+class_file:
+</dtml-comment>
+<params>processing_node
+to_date
+limit
+group_method_id
+order_validation_text</params>
+UPDATE
+  message
+SET
+  processing_node=<dtml-sqlvar processing_node type="int">
+WHERE
+  processing_node=0
+  AND date <= <dtml-sqlvar to_date type="datetime">
+  <dtml-if group_method_id> AND group_method_id = <dtml-sqlvar group_method_id type="string"> </dtml-if>
+  <dtml-if order_validation_text> AND order_validation_text = <dtml-sqlvar order_validation_text type="string"> </dtml-if>
+ORDER BY
+  priority, date, uid
+<dtml-if limit>
+  LIMIT <dtml-sqlvar limit type="int">
+</dtml-if>
+<dtml-var sql_delimiter>
+COMMIT

Added: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql?rev=17759&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql (added)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectReservedMessageList.zsql Fri Nov 23 11:02:39 2007
@@ -1,0 +1,21 @@
+<dtml-comment>
+title:
+connection_id:cmf_activity_sql_connection
+max_rows:0
+max_cache:0
+cache_time:0
+class_name:
+class_file:
+</dtml-comment>
+<params>processing_node
+limit</params>
+SELECT
+  *
+FROM
+  message
+WHERE
+  processing_node = <dtml-sqlvar processing_node type="int">
+  AND processing = 0
+<dtml-if limit>
+  LIMIT <dtml-sqlvar limit type="int">
+</dtml-if>

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_setPriority.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_setPriority.zsql?rev=17759&r1=17758&r2=17759&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_setPriority.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_setPriority.zsql Fri Nov 23 11:02:39 2007
@@ -36,7 +36,7 @@
   <dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
   )
 </dtml-if>
-<dtml-if processing_node>
+<dtml-if expr="_.getattr(_, 'processing_node', None) is not None">
   AND processing_node = <dtml-sqlvar processing_node type="int">
 </dtml-if>
 <dtml-if order_validation_text>

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_timeShift.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_timeShift.zsql?rev=17759&r1=17758&r2=17759&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_timeShift.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_timeShift.zsql Fri Nov 23 11:02:39 2007
@@ -20,6 +20,6 @@
 </dtml-if> 
 WHERE
   1 = 1
-<dtml-if processing_node>
+<dtml-if expr="processing_node is not None">
   AND processing_node = <dtml-sqlvar processing_node type="int">
 </dtml-if>

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_writeMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_writeMessage.zsql?rev=17759&r1=17758&r2=17759&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_writeMessage.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_writeMessage.zsql Fri Nov 23 11:02:39 2007
@@ -12,7 +12,6 @@
 method_id
 message
 priority
-broadcast
 date
 processing_node=-1
 group_method_id
@@ -22,12 +21,11 @@
 SET
   uid = <dtml-sqlvar uid type="int">,
   path = <dtml-sqlvar path type="string">,
-  <dtml-if date>date = <dtml-sqlvar date type="datetime">, <dtml-else>date = <dtml-sqlvar "_.DateTime()" type="datetime">, </dtml-if> 
+  date = <dtml-if date><dtml-sqlvar date type="datetime"><dtml-else>NOW()</dtml-if>,
   method_id = <dtml-sqlvar method_id type="string">,
   processing_node = <dtml-sqlvar processing_node type="int">,
   processing = 0,
   priority = <dtml-sqlvar priority type="int">,
-  broadcast = <dtml-sqlvar broadcast type="int">,
   group_method_id = <dtml-sqlvar group_method_id type="string">,
   tag = <dtml-sqlvar tag type="string">,
   order_validation_text = <dtml-sqlvar order_validation_text type="string">,

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_writeMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_writeMessageList.zsql?rev=17759&r1=17758&r2=17759&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_writeMessageList.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_writeMessageList.zsql Fri Nov 23 11:02:39 2007
@@ -12,26 +12,24 @@
 method_id_list
 message_list
 priority_list
-broadcast_list
 date_list
 processing_node_list
 group_method_id_list
 tag_list
 order_validation_text_list</params>
 INSERT INTO message
-(uid, path, date, method_id, processing_node, processing, priority, broadcast, group_method_id, tag, order_validation_text, message)
+(uid, path, date, method_id, processing_node, processing, priority, group_method_id, tag, order_validation_text, message)
 VALUES
 <dtml-in prefix="loop" expr="_.range(_.len(path_list))">
 <dtml-if sequence-start><dtml-else>,</dtml-if>
 (
   <dtml-sqlvar expr="uid_list[loop_item]" type="int">,
   <dtml-sqlvar expr="path_list[loop_item]" type="string">,
-  <dtml-if date_list><dtml-sqlvar expr="date_list[loop_item]" type="datetime"><dtml-else><dtml-sqlvar "_.DateTime()" type="datetime"></dtml-if>, 
+  <dtml-if date_list><dtml-if expr="date_list[loop_item] is not None"><dtml-sqlvar expr="date_list[loop_item]" type="datetime"><dtml-else>NOW()</dtml-if><dtml-else>NOW()</dtml-if>, 
   <dtml-sqlvar expr="method_id_list[loop_item]" type="string">,
   <dtml-if processing_node_list><dtml-sqlvar expr="processing_node_list[loop_item]" type="int"><dtml-else>-1</dtml-if>,
   0,
   <dtml-sqlvar expr="priority_list[loop_item]" type="int">,
-  <dtml-sqlvar expr="broadcast_list[loop_item]" type="int">,
   <dtml-sqlvar expr="group_method_id_list[loop_item]" type="string">,
   <dtml-sqlvar expr="tag_list[loop_item]" type="string">,
   <dtml-sqlvar expr="order_validation_text_list[loop_item]" type="string">,

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql?rev=17759&r1=17758&r2=17759&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql Fri Nov 23 11:02:39 2007
@@ -10,7 +10,6 @@
 <params>path
 processing_node
 method_id
-broadcast
 uid</params>
 UPDATE message_queue
 SET
@@ -18,9 +17,11 @@
   processing=0
 WHERE
 <dtml-if path> path = <dtml-sqlvar path type="string"> 
-<dtml-else> uid = <dtml-sqlvar uid type="int"> </dtml-if>
+<dtml-else>
+  uid IN (
+<dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
+)
+</dtml-if>
 <dtml-if method_id> AND method_id = <dtml-sqlvar method_id type="string"></dtml-if>
-<dtml-if broadcast>
-  AND broadcast = <dtml-sqlvar broadcast type="int">
-</dtml-if>
-
+<dtml-var sql_delimiter>
+COMMIT

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_clearProcessingFlag.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_clearProcessingFlag.zsql?rev=17759&r1=17758&r2=17759&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_clearProcessingFlag.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_clearProcessingFlag.zsql Fri Nov 23 11:02:39 2007
@@ -11,7 +11,7 @@
 UPDATE
   message_queue
 SET
-  processing="0"
+  processing=0,
+  processing_node=0
 WHERE
-  processing="1"
-  AND processing_node="<dtml-sqlvar processing_node type="int">"
+  processing_node=<dtml-sqlvar processing_node type="int">

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql?rev=17759&r1=17758&r2=17759&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql Fri Nov 23 11:02:39 2007
@@ -17,15 +17,13 @@
   `processing` INT DEFAULT 0,
   `processing_date` datetime,
   `priority` INT DEFAULT 0,
-  `broadcast` INT DEFAULT 0,
   `tag` VARCHAR(255),
   `message` LONGBLOB,
   PRIMARY KEY  (`uid`),
   KEY `date` (`date`),
   KEY `path` (`path`),
   KEY `method_id` (`method_id`),
-  KEY `processing_node` (`processing_node`),
-  KEY `processing` (`processing`),
+  KEY `processing_node_processing` (`processing_node`, `processing`),
   KEY `processing_date` (`processing_date`),
   KEY `priority` (`priority`),
   KEY `tag` (`tag`)

Added: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql?rev=17759&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql (added)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_makeMessageListAvailable.zsql Fri Nov 23 11:02:39 2007
@@ -1,0 +1,21 @@
+<dtml-comment>
+title:
+connection_id:cmf_activity_sql_connection
+max_rows:0
+max_cache:0
+cache_time:0
+class_name:
+class_file:
+</dtml-comment>
+<params>uid_list</params>
+UPDATE
+  message_queue
+SET
+  processing_node=0,
+  processing=0
+WHERE
+  uid IN (
+  <dtml-in prefix="uid" expr="uid_list"><dtml-sqlvar uid_item type="int"><dtml-if sequence-end><dtml-else>, </dtml-if></dtml-in>
+  )
+<dtml-var sql_delimiter>
+COMMIT

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_processMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_processMessage.zsql?rev=17759&r1=17758&r2=17759&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_processMessage.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_processMessage.zsql Fri Nov 23 11:02:39 2007
@@ -11,7 +11,11 @@
 UPDATE
   message_queue
 SET
-  processing_date = <dtml-sqlvar "_.DateTime()" type="datetime">,
+  processing_date = NOW(),
   processing=1
 WHERE
-  uid = <dtml-sqlvar uid type="int">
+  uid IN (
+<dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
+  )
+<dtml-var sql_delimiter>
+COMMIT 

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessage.zsql?rev=17759&r1=17758&r2=17759&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessage.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessage.zsql Fri Nov 23 11:02:39 2007
@@ -1,22 +1,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:1
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>processing_node
-priority
-to_date</params>
-SELECT * FROM
-    message_queue
-WHERE
-    processing = 0
-<dtml-if processing_node> AND processing_node = <dtml-sqlvar processing_node type="int"></dtml-if>
-<dtml-if priority> AND priority = <dtml-sqlvar priority type="int"> </dtml-if>
-<dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>
-
-ORDER BY
-    priority, date, uid

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessageList.zsql?rev=17759&r1=17758&r2=17759&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessageList.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessageList.zsql Fri Nov 23 11:02:39 2007
@@ -1,7 +1,7 @@
 <dtml-comment>
 title:
 connection_id:cmf_activity_sql_connection
-max_rows:1000
+max_rows:0
 max_cache:0
 cache_time:0
 class_name:
@@ -11,15 +11,19 @@
 method_id
 processing_node
 priority
-to_date</params>
+to_date
+offset:int=0
+count:int=1000
+</params>
 SELECT * FROM
     message_queue
 WHERE
     processing = 0
-<dtml-if processing_node>AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if>
+<dtml-if expr="processing_node is not None"> AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if>
 <dtml-if priority>AND priority = <dtml-sqlvar priority type="int"> </dtml-if>
 <dtml-if path>AND path = <dtml-sqlvar path type="string"></dtml-if>
 <dtml-if method_id>AND method_id = <dtml-sqlvar method_id type="string"></dtml-if>
 <dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>
 ORDER BY
     priority, date, uid
+LIMIT <dtml-sqlvar offset type="int">, <dtml-sqlvar count type="int">

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readUidList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readUidList.zsql?rev=17759&r1=17758&r2=17759&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readUidList.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readUidList.zsql Fri Nov 23 11:02:39 2007
@@ -15,6 +15,6 @@
     message_queue
 WHERE
     processing = 0
-<dtml-if processing_node> AND processing_node = <dtml-sqlvar processing_node type="int"></dtml-if>
+<dtml-if expr="processing_node is not None"> AND processing_node = <dtml-sqlvar processing_node type="int"></dtml-if>
 <dtml-if path> AND path = <dtml-sqlvar path type="string"></dtml-if>
 <dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>

Added: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql?rev=17759&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql (added)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_reserveMessageList.zsql Fri Nov 23 11:02:39 2007
@@ -1,0 +1,27 @@
+<dtml-comment>
+title:
+connection_id:cmf_activity_sql_connection
+max_rows:0
+max_cache:0
+cache_time:0
+class_name:
+class_file:
+</dtml-comment>
+<params>processing_node
+to_date
+limit
+</params>
+UPDATE
+  message_queue
+SET
+  processing_node=<dtml-sqlvar processing_node type="int">
+WHERE
+  processing_node=0
+  AND date <= <dtml-sqlvar to_date type="datetime">
+ORDER BY
+  priority, date, uid
+<dtml-if limit>
+  LIMIT <dtml-sqlvar limit type="int">
+</dtml-if>
+<dtml-var sql_delimiter>
+COMMIT

Added: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql?rev=17759&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql (added)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_selectReservedMessageList.zsql Fri Nov 23 11:02:39 2007
@@ -1,0 +1,21 @@
+<dtml-comment>
+title:
+connection_id:cmf_activity_sql_connection
+max_rows:0
+max_cache:0
+cache_time:0
+class_name:
+class_file:
+</dtml-comment>
+<params>processing_node
+limit</params>
+SELECT
+  *
+FROM
+  message_queue
+WHERE
+  processing_node = <dtml-sqlvar processing_node type="int">
+  AND processing = 0
+<dtml-if limit>
+  LIMIT <dtml-sqlvar limit type="int">
+</dtml-if>

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_setPriority.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_setPriority.zsql?rev=17759&r1=17758&r2=17759&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_setPriority.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_setPriority.zsql Fri Nov 23 11:02:39 2007
@@ -9,12 +9,21 @@
 </dtml-comment>
 <params>uid
 priority
+delay
 date</params>
 UPDATE
     message_queue
 SET
-    priority = <dtml-sqlvar priority type="int">,
-    processing = 0,
-    date = <dtml-sqlvar date type="datetime">  
+    processing = 0
+    <dtml-if priority>
+    , priority = <dtml-sqlvar priority type="int">
+    </dtml-if>
+    <dtml-if delay>
+    , date = DATE_ADD(NOW(), INTERVAL <dtml-sqlvar delay type="int"> SECOND)
+    <dtml-elif date>
+    , date = <dtml-sqlvar date type="datetime">
+    </dtml-if>
 WHERE
-    uid = <dtml-sqlvar uid type="int">
+  uid IN (
+  <dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
+  )

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_timeShift.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_timeShift.zsql?rev=17759&r1=17758&r2=17759&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_timeShift.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_timeShift.zsql Fri Nov 23 11:02:39 2007
@@ -16,6 +16,6 @@
   processing_date = processing_date - <dtml-sqlvar delay type="int">
 WHERE
   1 = 1
-<dtml-if processing_node>
+<dtml-if expr="processing_node is not None">
   AND processing_node = <dtml-sqlvar processing_node type="int">
-</dtml-if>
+</dtml-if>

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_writeMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_writeMessage.zsql?rev=17759&r1=17758&r2=17759&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_writeMessage.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_writeMessage.zsql Fri Nov 23 11:02:39 2007
@@ -12,7 +12,6 @@
 method_id
 message
 priority
-broadcast
 processing_node
 date
 tag</params>
@@ -20,12 +19,11 @@
 SET
   uid = <dtml-sqlvar uid type="int">,
   path = <dtml-sqlvar path type="string">,
-  <dtml-if date>date = <dtml-sqlvar date type="datetime">, <dtml-else>date = <dtml-sqlvar "_.DateTime()" type="datetime">, </dtml-if> 
+  date = <dtml-if date><dtml-sqlvar date type="datetime"><dtml-else>NOW()</dtml-if>,
   method_id = <dtml-sqlvar method_id type="string">,
 <dtml-if processing_node>
   processing_node = <dtml-sqlvar processing_node type="int">,
 </dtml-if>
-  broadcast = <dtml-sqlvar broadcast type="int">,
   processing = 0,
   priority = <dtml-sqlvar priority type="int">,
   tag = <dtml-sqlvar tag type="string">,




More information about the Erp5-report mailing list