[Erp5-report] r32876 jm - in /erp5/trunk/products/CMFActivity: Activity/ skins/activity/ te...

nobody at svn.erp5.org nobody at svn.erp5.org
Fri Feb 19 18:55:56 CET 2010


Author: jm
Date: Fri Feb 19 18:55:56 2010
New Revision: 32876

URL: http://svn.erp5.org?rev=32876&view=rev
Log:
Refactoring: move finalizeMessageExecution to SQLBase

Added:
    erp5/trunk/products/CMFActivity/skins/activity/SQLBase_assignMessage.zsql
      - copied, changed from r32875, erp5/trunk/products/CMFActivity/skins/activity/SQLDict_assignMessage.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLBase_delMessage.zsql
      - copied, changed from r32875, erp5/trunk/products/CMFActivity/skins/activity/SQLDict_delMessage.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLBase_reactivate.zsql
      - copied, changed from r32875, erp5/trunk/products/CMFActivity/skins/activity/SQLDict_setPriority.zsql
Removed:
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_assignMessage.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_delMessage.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLDict_setPriority.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_delMessage.zsql
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_setPriority.zsql
Modified:
    erp5/trunk/products/CMFActivity/Activity/SQLBase.py
    erp5/trunk/products/CMFActivity/Activity/SQLDict.py
    erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
    erp5/trunk/products/CMFActivity/tests/testCMFActivity.py

Modified: erp5/trunk/products/CMFActivity/Activity/SQLBase.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLBase.py?rev=32876&r1=32875&r2=32876&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLBase.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLBase.py [utf8] Fri Feb 19 18:55:56 2010
@@ -26,8 +26,17 @@
 #
 ##############################################################################
 
-from zLOG import LOG, INFO, WARNING
+import sys
+from zLOG import LOG, TRACE, INFO, WARNING, ERROR
 from ZODB.POSException import ConflictError
+from Products.CMFActivity.ActivityTool import (
+  MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED)
+from Products.CMFActivity.ActiveObject import (
+  INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE)
+from Queue import VALIDATION_ERROR_DELAY
+
+MAX_PRIORITY = 5
+
 
 class SQLBase:
   """
@@ -74,19 +83,14 @@
       priority = default
     return priority
 
-  def _retryOnLockError(self, method, args=(), kw=None):
-    if kw is None:
-      kw = {}
+  def _retryOnLockError(self, method, args=(), kw={}):
     while True:
       try:
-        result = method(*args, **kw)
+        return method(*args, **kw)
       except ConflictError:
         # Note that this code assumes that a database adapter translates
         # a lock error into a conflict error.
         LOG('SQLBase', INFO, 'Got a lock error, retrying...')
-      else:
-        break
-    return result
 
   def _validate_after_method_id(self, activity_tool, message, value):
     return self._validate(activity_tool, method_id=value)
@@ -117,3 +121,115 @@
 
   def _validate_serialization_tag(self, activity_tool, message, value):
     return self._validate(activity_tool, serialization_tag=value)
+
+  def _log(self, severity, summary):
+    LOG(self.__class__.__name__, severity, summary,
+        error=severity>INFO and sys.exc_info() or None)
+
+  def finalizeMessageExecution(self, activity_tool, message_list,
+                               uid_to_duplicate_uid_list_dict=None):
+    """
+      If everything was fine, delete all messages.
+      If anything failed, make successful messages available (if any), and
+      the following rules apply to failed messages:
+        - Failures due to ConflictErrors cause messages to be postponed,
+          but their priority is *not* increased.
+        - Failures of messages already above maximum priority cause them to
+          be put in a permanent-error state.
+        - In all other cases, priority is increased and message is delayed.
+    """
+    deletable_uid_list = []
+    delay_uid_list = []
+    final_error_uid_list = []
+    make_available_uid_list = []
+    notify_user_list = []
+    non_executable_message_list = []
+    executed_uid_list = deletable_uid_list
+    if uid_to_duplicate_uid_list_dict is not None:
+      for m in message_list:
+        if m.getExecutionState() == MESSAGE_NOT_EXECUTED:
+          executed_uid_list = make_available_uid_list
+          break
+    for m in message_list:
+      uid = m.uid
+      if m.getExecutionState() == MESSAGE_EXECUTED:
+        executed_uid_list.append(uid)
+        if uid_to_duplicate_uid_list_dict is not None:
+          executed_uid_list += uid_to_duplicate_uid_list_dict.get(uid, ())
+      elif m.getExecutionState() == MESSAGE_NOT_EXECUTED:
+        # Should duplicate messages follow strictly the original message, or
+        # should they be just made available again ?
+        if uid_to_duplicate_uid_list_dict is not None:
+          make_available_uid_list += uid_to_duplicate_uid_list_dict.get(uid, ())
+        priority = m.line.priority
+        # BACK: Only exceptions can be classes in Python 2.6.
+        # Once we drop support for Python 2.4,
+        # please, remove the "type(m.exc_type) is type(ConflictError)" check
+        # and leave only the "issubclass(m.exc_type, ConflictError)" check.
+        if type(m.exc_type) is type(ConflictError) and \
+           issubclass(m.exc_type, ConflictError):
+          delay_uid_list.append(uid)
+        elif priority > MAX_PRIORITY:
+          notify_user_list.append(m)
+          final_error_uid_list.append(uid)
+        else:
+          try:
+            # Immediately update, because values different for every message
+            activity_tool.SQLBase_reactivate(table=self.sql_table,
+                                             uid=[uid],
+                                             delay=None,
+                                             priority=priority + 1)
+          except:
+            self._log(WARNING, 'Failed to increase priority of %r' % uid)
+          delay_uid_list.append(uid)
+      else:
+        # Internal CMFActivity error: the message can not be executed because
+        # something is missing (context object cannot be found, method cannot
+        # be accessed on object).
+        non_executable_message_list.append(uid)
+    if deletable_uid_list:
+      try:
+        self._retryOnLockError(activity_tool.SQLBase_delMessage,
+                               kw={'table': self.sql_table,
+                                   'uid': deletable_uid_list})
+      except:
+        self._log(ERROR, 'Failed to delete messages %r' % deletable_uid_list)
+      else:
+        self._log(TRACE, 'Deleted messages %r' % deletable_uid_list)
+    if delay_uid_list:
+      try:
+        # If this is a conflict error, do not lower the priority but only delay.
+        activity_tool.SQLBase_reactivate(table=self.sql_table,
+          uid=delay_uid_list, delay=VALIDATION_ERROR_DELAY, priority=None)
+      except:
+        self._log(ERROR, 'Failed to delay %r' % delay_uid_list)
+      make_available_uid_list += delay_uid_list
+    if final_error_uid_list:
+      try:
+        activity_tool.SQLBase_assignMessage(table=self.sql_table,
+          uid=final_error_uid_list, processing_node=INVOKE_ERROR_STATE)
+      except:
+        self._log(ERROR, 'Failed to set message to error state for %r'
+                         % final_error_uid_list)
+    if non_executable_message_list:
+      try:
+        activity_tool.SQLBase_assignMessage(table=self.sql_table,
+          uid=non_executable_message_list, processing_node=VALIDATE_ERROR_STATE)
+      except:
+        self._log(ERROR, 'Failed to set message to invalid path state for %r'
+                         % non_executable_message_list)
+    if make_available_uid_list:
+      try:
+        self.makeMessageListAvailable(activity_tool=activity_tool,
+                                      uid_list=make_available_uid_list)
+      except:
+        self._log(ERROR, 'Failed to unreserve %r' % make_available_uid_list)
+      else:
+        self._log(TRACE, 'Freed messages %r' % make_available_uid_list)
+    try:
+      for m in notify_user_list:
+        m.notifyUser(activity_tool)
+    except:
+      # Notification failures must not cause this method to raise.
+      self._log(WARNING,
+        'Exception during notification phase of finalizeMessageExecution')

Modified: erp5/trunk/products/CMFActivity/Activity/SQLDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLDict.py?rev=32876&r1=32875&r2=32876&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLDict.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLDict.py [utf8] Fri Feb 19 18:55:56 2010
@@ -27,8 +27,7 @@
 ##############################################################################
 
 from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
-from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY, \
-        abortTransactionSynchronously
+from Queue import VALID, INVALID_PATH, abortTransactionSynchronously
 from RAMDict import RAMDict
 from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
 from Products.CMFActivity.Errors import ActivityFlushError
@@ -47,7 +46,6 @@
 
 from zLOG import LOG, TRACE, WARNING, ERROR, INFO, PANIC
 
-MAX_PRIORITY = 5
 # Stop validating more messages when this limit is reached
 MAX_VALIDATED_LIMIT = 1000 
 # Read up to this number of messages to validate.
@@ -64,6 +62,8 @@
     and provide sequentiality. Should not create conflict
     because use of OOBTree.
   """
+  sql_table = 'message'
+
   # Transaction commit methods
   def prepareQueueMessageList(self, activity_tool, message_list):
     message_list = [m for m in message_list if m.is_registered]
@@ -104,7 +104,7 @@
                                                  order_validation_text = order_validation_text)
     uid_list = [x.uid for x in uid_list]
     if len(uid_list)>0:
-      activity_tool.SQLDict_delMessage(uid = uid_list)
+      activity_tool.SQLBase_delMessage(table=self.sql_table, uid=uid_list)
 
   def finishQueueMessage(self, activity_tool_path, m):
     # Nothing to do in SQLDict.
@@ -130,26 +130,6 @@
   def getRegisteredMessageList(self, activity_buffer, activity_tool):
     message_list = activity_buffer.getMessageList(self)
     return [m for m in message_list if m.is_registered]
-  
-  def validateMessage(self, activity_tool, message, uid_list, priority, processing_node):
-    validation_state = message.validate(self, activity_tool, check_order_validation=0)
-    if validation_state is not VALID:
-      # There is a serious validation error - we must lower priority
-      if priority > MAX_PRIORITY:
-        # This is an error
-        if len(uid_list) > 0:
-          activity_tool.SQLDict_assignMessage(uid=uid_list, processing_node=VALIDATE_ERROR_STATE)
-                                                                          # Assign message back to 'error' state
-        #m.notifyUser(activity_tool)                                      # Notify Error
-        get_transaction().commit()                                        # and commit
-      else:
-        # Lower priority
-        if len(uid_list) > 0: # Add some delay before new processing
-          activity_tool.SQLDict_setPriority(uid=uid_list, delay=VALIDATION_ERROR_DELAY,
-                                            priority=priority + 1, retry=1)
-        get_transaction().commit() # Release locks before starting a potentially long calculation
-      return 0
-    return 1
 
   def getReservedMessageList(self, activity_tool, date, processing_node, limit=None, group_method_id=None):
     """
@@ -320,105 +300,6 @@
       else:
         LOG('SQLDict', TRACE, '(no message was reserved)')
       return [], 0, None, {}
-
-  def finalizeMessageExecution(self, activity_tool, message_list, uid_to_duplicate_uid_list_dict):
-    """
-      If everything was fine, delete all messages.
-      If anything failed, make successful messages available (if any), and
-      the following rules apply to failed messages:
-        - Failures due to ConflictErrors cause messages to be postponed,
-          but their priority is *not* increased.
-        - Failures of messages already above maximum priority cause them to
-          be put in a permanent-error state.
-        - In all other cases, priority is increased and message is delayed.
-    """
-    def makeMessageListAvailable(uid_list):
-      self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
-    deletable_uid_list = []
-    delay_uid_list = []
-    final_error_uid_list = []
-    make_available_uid_list = []
-    notify_user_list = []
-    non_executable_message_list = []
-    something_failed = (len([m for m in message_list if m.getExecutionState() == MESSAGE_NOT_EXECUTED]) != 0)
-    for m in message_list:
-      uid = m.uid
-      if m.getExecutionState() == MESSAGE_EXECUTED:
-        if something_failed:
-          make_available_uid_list.append(uid)
-          make_available_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
-        else:
-          deletable_uid_list.append(uid)
-          deletable_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
-      elif m.getExecutionState() == MESSAGE_NOT_EXECUTED:
-        # Should duplicate messages follow strictly the original message, or
-        # should they be just made available again ?
-        make_available_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
-        priority = m.line.priority
-        # BACK: Only exceptions can be classes in Python 2.6.
-        # Once we drop support for Python 2.4, 
-        # please, remove the "type(m.exc_type) is type(ConflictError)" check
-        # and leave only the "issubclass(m.exc_type, ConflictError)" check.
-        if type(m.exc_type) is type(ConflictError) and \
-           issubclass(m.exc_type, ConflictError):
-          delay_uid_list.append(uid)
-        elif priority > MAX_PRIORITY:
-          notify_user_list.append(m)
-          final_error_uid_list.append(uid)
-        else:
-          try:
-            # Immediately update, because values different for every message
-            activity_tool.SQLDict_setPriority(
-              uid=[uid],
-              delay=None,
-              retry=None,
-              priority=priority + 1)
-          except:
-            LOG('SQLDict', WARNING, 'Failed to increase priority of %r' % (uid, ), error=sys.exc_info())
-          delay_uid_list.append(uid)
-      else:
-        # Internal CMFActivity error: the message can not be executed because
-        # something is missing (context object cannot be found, method cannot
-        # be accessed on object).
-        non_executable_message_list.append(uid)
-    if len(deletable_uid_list):
-      try:
-        self._retryOnLockError(activity_tool.SQLDict_delMessage, kw={'uid': deletable_uid_list})
-      except:
-        LOG('SQLDict', ERROR, 'Failed to delete messages %r' % (deletable_uid_list, ), error=sys.exc_info())
-      else:
-        LOG('SQLDict', TRACE, 'Deleted messages %r' % (deletable_uid_list, ))
-    if len(delay_uid_list):
-      try:
-        # If this is a conflict error, do not lower the priority but only delay.
-        activity_tool.SQLDict_setPriority(uid=delay_uid_list, delay=VALIDATION_ERROR_DELAY, priority=None, retry=None)
-      except:
-        LOG('SQLDict', ERROR, 'Failed to delay %r' % (delay_uid_list, ), error=sys.exc_info())
-      make_available_uid_list += delay_uid_list
-    if len(final_error_uid_list):
-      try:
-        activity_tool.SQLDict_assignMessage(uid=final_error_uid_list,
-                                            processing_node=INVOKE_ERROR_STATE)
-      except:
-        LOG('SQLDict', ERROR, 'Failed to set message to error state for %r' % (final_error_uid_list, ), error=sys.exc_info())
-    if len(non_executable_message_list):
-      try:
-        activity_tool.SQLDict_assignMessage(uid=non_executable_message_list, processing_node=VALIDATE_ERROR_STATE)
-      except:
-        LOG('SQLDict', ERROR, 'Failed to set message to invalid path state for %r' % (non_executable_message_list, ), error=sys.exc_info())
-    if len(make_available_uid_list):
-      try:
-        makeMessageListAvailable(make_available_uid_list)
-      except:
-        LOG('SQLDict', ERROR, 'Failed to unreserve %r' % (make_available_uid_list, ), error=sys.exc_info())
-      else:
-        LOG('SQLDict', TRACE, 'Freed messages %r' % (make_available_uid_list, ))
-    try:
-      for m in notify_user_list:
-        m.notifyUser(activity_tool)
-    except:
-      # Notification failures must not cause this method to raise.
-      LOG('SQLDict', WARNING, 'Exception during notification phase of finalizeMessageExecution', error=sys.exc_info())
 
   # Queue semantic
   def dequeueMessage(self, activity_tool, processing_node):
@@ -595,7 +476,8 @@
         uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
                                                      order_validation_text=None)
         if len(uid_list)>0:
-          activity_tool.SQLDict_delMessage(uid = [x.uid for x in uid_list])
+          activity_tool.SQLBase_delMessage(table=self.sql_table,
+                                           uid=[x.uid for x in uid_list])
 
   getMessageList = SQLBase.getMessageList
 
@@ -672,11 +554,13 @@
                 if group_method_id is not None:
                   serialization_tag_group_method_id_dict[serialization_tag] = group_method_id
           if deletable_uid_list:
-            activity_tool.SQLDict_delMessage(uid=deletable_uid_list)
+            activity_tool.SQLBase_delMessage(table=self.sql_table,
+                                             uid=deletable_uid_list)
 
         distributable_count = len(message_dict)
         if distributable_count:
-          activity_tool.SQLDict_assignMessage(processing_node=0, uid=[message.uid for message in message_dict.itervalues()])
+          activity_tool.SQLBase_assignMessage(table=self.sql_table,
+            processing_node=0, uid=[m.uid for m in message_dict.itervalues()])
           validated_count += distributable_count
         if validated_count < MAX_VALIDATED_LIMIT:
           offset += READ_MESSAGE_LIMIT

Modified: erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLQueue.py?rev=32876&r1=32875&r2=32876&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLQueue.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLQueue.py [utf8] Fri Feb 19 18:55:56 2010
@@ -28,8 +28,7 @@
 
 from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
 from RAMQueue import RAMQueue
-from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY, \
-        abortTransactionSynchronously
+from Queue import VALID, INVALID_PATH, abortTransactionSynchronously
 from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
 from Products.CMFActivity.Errors import ActivityFlushError
 from ZODB.POSException import ConflictError
@@ -48,7 +47,6 @@
 
 from zLOG import LOG, WARNING, ERROR, INFO, PANIC, TRACE
 
-MAX_PRIORITY = 5
 # Stop validating more messages when this limit is reached
 MAX_VALIDATED_LIMIT = 1000
 # Read this many messages to validate.
@@ -73,6 +71,7 @@
     and provide sequentiality. Should not create conflict
     because use of OOBTree.
   """
+  sql_table = 'message_queue'
 
   def prepareQueueMessageList(self, activity_tool, message_list):
     message_list = [m for m in message_list if m.is_registered]
@@ -103,7 +102,7 @@
   def prepareDeleteMessage(self, activity_tool, m):
     # Erase all messages in a single transaction
     #LOG("prepareDeleteMessage", 0, str(m.__dict__))
-    activity_tool.SQLQueue_delMessage(uid = [m.uid])
+    activity_tool.SQLBase_delMessage(table=self.sql_table, uid=[m.uid])
 
   def finishQueueMessage(self, activity_tool_path, m):
     # Nothing to do in SQLQueue.
@@ -198,88 +197,6 @@
       else:
         LOG('SQLQueue', TRACE, '(no message was reserved)')
       return []
-
-  def finalizeMessageExecution(self, activity_tool, message_list):
-    def makeMessageListAvailable(uid_list):
-      self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
-    deletable_uid_list = []
-    delay_uid_list = []
-    final_error_uid_list = []
-    notify_user_list = []
-    non_executable_message_list = []
-    for m in message_list:
-      uid = m.uid
-      if m.getExecutionState() == MESSAGE_EXECUTED:
-        deletable_uid_list.append(uid)
-      elif m.getExecutionState() == MESSAGE_NOT_EXECUTED:
-        priority = m.line.priority
-        # BACK: Only exceptions can be classes in Python 2.6.
-        # Once we drop support for Python 2.4, 
-        # please, remove the "type(m.exc_type) is type(ConflictError)" check
-        # and leave only the "issubclass(m.exc_type, ConflictError)" check.
-        if type(m.exc_type) is type(ConflictError) and \
-           issubclass(m.exc_type, ConflictError):
-          delay_uid_list.append(uid)
-        elif priority > MAX_PRIORITY:
-          notify_user_list.append(m)
-          final_error_uid_list.append(uid)
-        else:
-          try:
-            # Immediately update, because values different for every message
-            activity_tool.SQLQueue_setPriority(
-              uid=[uid],
-              delay=VALIDATION_ERROR_DELAY,
-              priority=priority + 1)
-          except:
-            LOG('SQLQueue', WARNING, 'Failed to increase priority of %r' % (uid, ), error=sys.exc_info())
-          try:
-            makeMessageListAvailable(delay_uid_list)
-          except:
-            LOG('SQLQueue', ERROR, 'Failed to unreserve %r' % (uid, ), error=sys.exc_info())
-          else:
-            LOG('SQLQueue', TRACE, 'Freed message %r' % (uid, ))
-      else:
-        # Internal CMFActivity error: the message can not be executed because
-        # something is missing (context object cannot be found, method cannot
-        # be accessed on object).
-        non_executable_message_list.append(uid)
-    if len(deletable_uid_list):
-      try:
-        self._retryOnLockError(activity_tool.SQLQueue_delMessage, kw={'uid': deletable_uid_list})
-      except:
-        LOG('SQLQueue', ERROR, 'Failed to delete messages %r' % (deletable_uid_list, ), error=sys.exc_info())
-      else:
-        LOG('SQLQueue', TRACE, 'Deleted messages %r' % (deletable_uid_list, ))
-    if len(delay_uid_list):
-      try:
-        # If this is a conflict error, do not lower the priority but only delay.
-        activity_tool.SQLQueue_setPriority(uid=delay_uid_list, delay=VALIDATION_ERROR_DELAY, priority=None)
-      except:
-        LOG('SQLQueue', ERROR, 'Failed to delay %r' % (delay_uid_list, ), error=sys.exc_info())
-      try:
-        makeMessageListAvailable(delay_uid_list)
-      except:
-        LOG('SQLQueue', ERROR, 'Failed to unreserve %r' % (delay_uid_list, ), error=sys.exc_info())
-      else:
-        LOG('SQLQueue', TRACE, 'Freed messages %r' % (delay_uid_list, ))
-    if len(final_error_uid_list):
-      try:
-        activity_tool.SQLQueue_assignMessage(uid=final_error_uid_list,
-                                             processing_node=INVOKE_ERROR_STATE)
-      except:
-        LOG('SQLQueue', ERROR, 'Failed to set message to error state for %r' % (final_error_uid_list, ), error=sys.exc_info())
-    if len(non_executable_message_list):
-      try:
-        activity_tool.SQLQueue_assignMessage(uid=non_executable_message_list,
-                                             processing_node=VALIDATE_ERROR_STATE)
-      except:
-        LOG('SQLQueue', ERROR, 'Failed to set message to invalid path state for %r' % (final_error_uid_list, ), error=sys.exc_info())
-    try:
-      for m in notify_user_list:
-        m.notifyUser(activity_tool)
-    except:
-      # Notification failures must not cause this method to raise.
-      LOG('SQLQueue', WARNING, 'Exception during notification phase of finalizeMessageExecution', error=sys.exc_info())
 
   def dequeueMessage(self, activity_tool, processing_node):
     def makeMessageListAvailable(uid_list):
@@ -430,7 +347,8 @@
                 'Could not validate %s on %s' % (m.method_id , path))
 
       if len(result):
-        activity_tool.SQLQueue_delMessage(uid = [line.uid for line in result])
+        activity_tool.SQLBase_delMessage(table=self.sql_table,
+                                         uid=[line.uid for line in result])
 
   getMessageList = SQLBase.getMessageList
 
@@ -489,7 +407,8 @@
                                         validation_text_dict, now_date=now_date)
         distributable_count = len(message_dict)
         if distributable_count:
-          activity_tool.SQLQueue_assignMessage(processing_node=0, uid=[message.uid for message in message_dict.itervalues()])
+          activity_tool.SQLBase_assignMessage(table=self.sql_table,
+            processing_node=0, uid=[m.uid for m in message_dict.itervalues()])
           validated_count += distributable_count
         if validated_count < MAX_VALIDATED_LIMIT:
           offset += READ_MESSAGE_LIMIT

Copied: erp5/trunk/products/CMFActivity/skins/activity/SQLBase_assignMessage.zsql (from r32875, erp5/trunk/products/CMFActivity/skins/activity/SQLDict_assignMessage.zsql)
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLBase_assignMessage.zsql?p2=erp5/trunk/products/CMFActivity/skins/activity/SQLBase_assignMessage.zsql&p1=erp5/trunk/products/CMFActivity/skins/activity/SQLDict_assignMessage.zsql&r1=32875&r2=32876&rev=32876&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_assignMessage.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLBase_assignMessage.zsql [utf8] Fri Feb 19 18:55:56 2010
@@ -7,17 +7,16 @@
 class_name:
 class_file:
 </dtml-comment>
-<params>
+<params>table
 processing_node
-uid
+uid:list
 </params>
-UPDATE message
+UPDATE
+  <dtml-var table>
 SET
   processing_node=<dtml-sqlvar processing_node type="int">,
   processing=0
 WHERE
-  uid IN (
-<dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
-)
+  <dtml-sqltest uid type="int" multiple>
 <dtml-var sql_delimiter>
 COMMIT

Copied: erp5/trunk/products/CMFActivity/skins/activity/SQLBase_delMessage.zsql (from r32875, erp5/trunk/products/CMFActivity/skins/activity/SQLDict_delMessage.zsql)
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLBase_delMessage.zsql?p2=erp5/trunk/products/CMFActivity/skins/activity/SQLBase_delMessage.zsql&p1=erp5/trunk/products/CMFActivity/skins/activity/SQLDict_delMessage.zsql&r1=32875&r2=32876&rev=32876&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_delMessage.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLBase_delMessage.zsql [utf8] Fri Feb 19 18:55:56 2010
@@ -7,9 +7,10 @@
 class_name:
 class_file:
 </dtml-comment>
-<params>uid:list</params>
+<params>table
+uid:list
+</params>
 DELETE FROM
-	message
+  <dtml-var table>
 WHERE
-<dtml-in uid>uid = <dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else> OR </dtml-if>
-</dtml-in>
+  <dtml-sqltest uid type="int" multiple>

Copied: erp5/trunk/products/CMFActivity/skins/activity/SQLBase_reactivate.zsql (from r32875, erp5/trunk/products/CMFActivity/skins/activity/SQLDict_setPriority.zsql)
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLBase_reactivate.zsql?p2=erp5/trunk/products/CMFActivity/skins/activity/SQLBase_reactivate.zsql&p1=erp5/trunk/products/CMFActivity/skins/activity/SQLDict_setPriority.zsql&r1=32875&r2=32876&rev=32876&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_setPriority.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLBase_reactivate.zsql [utf8] Fri Feb 19 18:55:56 2010
@@ -7,25 +7,20 @@
 class_name:
 class_file:
 </dtml-comment>
-<params>uid:list
+<params>table
+uid:list
 priority
-retry
 delay
 </params>
 UPDATE
-  message
+  <dtml-var table>
 SET
   processing = 0
 <dtml-if expr="priority is not None">
   , priority = <dtml-sqlvar priority type="int">
 </dtml-if>
 <dtml-if expr="delay is not None">
-  , date = DATE_ADD(UTC_TIMESTAMP(), INTERVAL <dtml-sqlvar delay type="int"> * (retry + 1) SECOND)
-</dtml-if>
-<dtml-if expr="retry is not None">
-  , retry = retry + <dtml-sqlvar retry type="int">
+  , date = DATE_ADD(UTC_TIMESTAMP(), INTERVAL <dtml-sqlvar delay type="int"> SECOND)
 </dtml-if>
 WHERE
-  uid IN (
-  <dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
-  )
+  <dtml-sqltest uid type="int" multiple>

Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_assignMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_assignMessage.zsql?rev=32875&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_assignMessage.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_assignMessage.zsql (removed)
@@ -1,23 +1,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:1
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>
-processing_node
-uid
-</params>
-UPDATE message
-SET
-  processing_node=<dtml-sqlvar processing_node type="int">,
-  processing=0
-WHERE
-  uid IN (
-<dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
-)
-<dtml-var sql_delimiter>
-COMMIT

Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_delMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_delMessage.zsql?rev=32875&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_delMessage.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_delMessage.zsql (removed)
@@ -1,15 +1,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:1000
-max_cache:100
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>uid:list</params>
-DELETE FROM
-	message
-WHERE
-<dtml-in uid>uid = <dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else> OR </dtml-if>
-</dtml-in>

Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_setPriority.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_setPriority.zsql?rev=32875&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_setPriority.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_setPriority.zsql (removed)
@@ -1,31 +1,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:1000
-max_cache:100
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>uid:list
-priority
-retry
-delay
-</params>
-UPDATE
-  message
-SET
-  processing = 0
-<dtml-if expr="priority is not None">
-  , priority = <dtml-sqlvar priority type="int">
-</dtml-if>
-<dtml-if expr="delay is not None">
-  , date = DATE_ADD(UTC_TIMESTAMP(), INTERVAL <dtml-sqlvar delay type="int"> * (retry + 1) SECOND)
-</dtml-if>
-<dtml-if expr="retry is not None">
-  , retry = retry + <dtml-sqlvar retry type="int">
-</dtml-if>
-WHERE
-  uid IN (
-  <dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
-  )

Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql?rev=32875&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql (removed)
@@ -1,22 +1,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:1
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>
-processing_node
-uid</params>
-UPDATE message_queue
-SET
-  processing_node=<dtml-sqlvar processing_node type="int">,
-  processing=0
-WHERE
-  uid IN (
-<dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
-)
-<dtml-var sql_delimiter>
-COMMIT

Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_delMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_delMessage.zsql?rev=32875&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_delMessage.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_delMessage.zsql (removed)
@@ -1,15 +1,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:1000
-max_cache:100
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>uid</params>
-DELETE FROM
-  message_queue
-WHERE
-<dtml-in uid>uid = <dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else> OR </dtml-if>
-</dtml-in>

Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_setPriority.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_setPriority.zsql?rev=32875&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_setPriority.zsql [utf8] (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_setPriority.zsql (removed)
@@ -1,27 +1,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:1000
-max_cache:100
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>uid
-priority
-delay
-</params>
-UPDATE
-    message_queue
-SET
-    processing = 0
-    <dtml-if expr="priority is not None">
-    , priority = <dtml-sqlvar priority type="int">
-    </dtml-if>
-    <dtml-if expr="delay is not None">
-    , date = DATE_ADD(UTC_TIMESTAMP(), INTERVAL <dtml-sqlvar delay type="int"> SECOND)
-    </dtml-if>
-WHERE
-  uid IN (
-  <dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
-  )

Modified: erp5/trunk/products/CMFActivity/tests/testCMFActivity.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/tests/testCMFActivity.py?rev=32876&r1=32875&r2=32876&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/tests/testCMFActivity.py [utf8] (original)
+++ erp5/trunk/products/CMFActivity/tests/testCMFActivity.py [utf8] Fri Feb 19 18:55:56 2010
@@ -35,6 +35,7 @@
 from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE,\
                                               VALIDATE_ERROR_STATE
 from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY
+from Products.CMFActivity.Activity.SQLDict import SQLDict
 from Products.CMFActivity.Errors import ActivityPendingError, ActivityFlushError
 #from Products.ERP5Type.Document.Organisation import Organisation
 # The above cannot be imported at top level because it doesn't exist until
@@ -3512,7 +3513,8 @@
                        1)
     finally:
       # Clear activities from all nodes
-      activity_tool.SQLDict_delMessage(uid=[message.uid for message in result])
+      activity_tool.SQLBase_delMessage(table=SQLDict.sql_table,
+                                       uid=[message.uid for message in result])
       get_transaction().commit()
 
   def test_116_RaiseInCommitBeforeMessageExecution(self):




More information about the Erp5-report mailing list