[Erp5-report] r8156 - in /erp5/trunk/products/CMFActivity/Activity: SQLDict.py SQLQueue.py

nobody at svn.erp5.org nobody at svn.erp5.org
Fri Jun 23 13:58:32 CEST 2006


Author: yo
Date: Fri Jun 23 13:58:28 2006
New Revision: 8156

URL: http://svn.erp5.org?rev=8156&view=rev
Log:
Do not use hasattr, because hasattr drains exceptions.

Modified:
    erp5/trunk/products/CMFActivity/Activity/SQLDict.py
    erp5/trunk/products/CMFActivity/Activity/SQLQueue.py

Modified: erp5/trunk/products/CMFActivity/Activity/SQLDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLDict.py?rev=8156&r1=8155&r2=8156&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLDict.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLDict.py Fri Jun 23 13:58:28 2006
@@ -103,7 +103,7 @@
                                               group_method_id_list = group_method_id_list,
                                               tag_list = tag_list,
                                               order_validation_text_list = order_validation_text_list)
-                                                         
+
   def prepareDeleteMessage(self, activity_tool, m):
     # Erase all messages in a single transaction
     path = '/'.join(m.object_path)
@@ -165,7 +165,7 @@
       # string is false, so we must use a non-empty string for this.
       return 'none'
     return sha.new(repr(order_validation_item_list)).hexdigest()
-    
+
   def validateMessage(self, activity_tool, message, uid_list, priority, processing_node):
     validation_state = message.validate(self, activity_tool)
     if validation_state is not VALID:
@@ -195,159 +195,163 @@
         get_transaction().commit() # Release locks before starting a potentially long calculation
       return 0
     return 1
-  
+
   # Queue semantic
   def dequeueMessage(self, activity_tool, processing_node):
-    if hasattr(activity_tool,'SQLDict_readMessage'):
-      now_date = DateTime()
-      priority = random.choice(priority_weight)
-      # Try to find a message at given priority level which is scheduled for now
-      result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority,
-                                                 to_date=now_date)
-      if len(result) == 0:
-        # If empty, take any message which is scheduled for now
-        priority = None
-        result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority, to_date=now_date)
-      if len(result) == 0:
-        # If the result is still empty, shift the dates so that SQLDict can dispatch pending active
-        # objects quickly.
-        self.timeShift(activity_tool, VALIDATION_ERROR_DELAY, processing_node,retry=1)
-      elif len(result) > 0:
-        #LOG('SQLDict dequeueMessage', 100, 'result = %r' % (list(result)))
-        line = result[0]
-        path = line.path
-        method_id = line.method_id
-        order_validation_text = line.order_validation_text
-        uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id, 
-                                                     processing_node = None, to_date = now_date,
-                                                     order_validation_text = order_validation_text)
-        uid_list = [x.uid for x in uid_list]
-        uid_list_list = [uid_list]
-        priority_list = [line.priority]
-        # Make sure message can not be processed anylonger
-        if len(uid_list) > 0:
-          # Set selected messages to processing
-          activity_tool.SQLDict_processMessage(uid = uid_list)
-        get_transaction().commit() # Release locks before starting a potentially long calculation
-        # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state
-        m = self.loadMessage(line.message, uid = line.uid)
-        message_list = [m]
-        # Validate message (make sure object exists, priority OK, etc.)
-        if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
-          group_method_id = m.activity_kw.get('group_method_id')
-          if group_method_id is not None:
-            # Count the number of objects to prevent too many objects.
-            if m.hasExpandMethod():
-              try:
-                count = len(m.getObjectList(activity_tool))
-              except:
-                # Here, simply ignore an exception. The same exception should be handled later.
-                LOG('SQLDict', 0, 'ignoring an exception from getObjectList', error=sys.exc_info())
-                count = 0
+    readMessage = getattr(activity_tool, 'SQLDict_readMessage', None)
+    if readMessage is None:
+      return 1
+
+    now_date = DateTime()
+    priority = random.choice(priority_weight)
+    # Try to find a message at given priority level which is scheduled for now
+    result = readMessage(processing_node=processing_node, priority=priority,
+                         to_date=now_date)
+    if len(result) == 0:
+      # If empty, take any message which is scheduled for now
+      priority = None
+      result = readMessage(processing_node=processing_node, priority=priority, to_date=now_date)
+    if len(result) == 0:
+      # If the result is still empty, shift the dates so that SQLDict can dispatch pending active
+      # objects quickly.
+      self.timeShift(activity_tool, VALIDATION_ERROR_DELAY, processing_node,retry=1)
+    elif len(result) > 0:
+      #LOG('SQLDict dequeueMessage', 100, 'result = %r' % (list(result)))
+      line = result[0]
+      path = line.path
+      method_id = line.method_id
+      order_validation_text = line.order_validation_text
+      uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
+                                                    processing_node = None, to_date = now_date,
+                                                    order_validation_text = order_validation_text)
+      uid_list = [x.uid for x in uid_list]
+      uid_list_list = [uid_list]
+      priority_list = [line.priority]
+      # Make sure message can not be processed anylonger
+      if len(uid_list) > 0:
+        # Set selected messages to processing
+        activity_tool.SQLDict_processMessage(uid = uid_list)
+      get_transaction().commit() # Release locks before starting a potentially long calculation
+      # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state
+      m = self.loadMessage(line.message, uid = line.uid)
+      message_list = [m]
+      # Validate message (make sure object exists, priority OK, etc.)
+      if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
+        group_method_id = m.activity_kw.get('group_method_id')
+        if group_method_id is not None:
+          # Count the number of objects to prevent too many objects.
+          if m.hasExpandMethod():
+            try:
+              count = len(m.getObjectList(activity_tool))
+            except:
+              # Here, simply ignore an exception. The same exception should be handled later.
+              LOG('SQLDict', 0, 'ignoring an exception from getObjectList', error=sys.exc_info())
+              count = 0
+          else:
+            count = 1
+
+          group_method = activity_tool.restrictedTraverse(group_method_id)
+
+          if count < MAX_GROUPED_OBJECTS:
+            # Retrieve objects which have the same group method.
+            result = readMessage(processing_node = processing_node, priority = priority,
+                                 to_date = now_date, group_method_id = group_method_id,
+                                 order_validation_text = order_validation_text)
+            #LOG('SQLDict dequeueMessage', 0, 'result = %d' % (len(result)))
+            for line in result:
+              path = line.path
+              method_id = line.method_id
+              uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
+                                                            processing_node = None, to_date = now_date,
+                                                            order_validation_text = order_validation_text)
+              uid_list = [x.uid for x in uid_list]
+              if len(uid_list) > 0:
+                # Set selected messages to processing
+                activity_tool.SQLDict_processMessage(uid = uid_list)
+              get_transaction().commit() # Release locks before starting a potentially long calculation
+              m = self.loadMessage(line.message, uid = line.uid)
+              if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
+                if m.hasExpandMethod():
+                  try:
+                    count += len(m.getObjectList(activity_tool))
+                  except:
+                    # Here, simply ignore an exception. The same exception should be handled later.
+                    LOG('SQLDict', 0, 'ignoring an exception from getObjectList', error=sys.exc_info())
+                    pass
+                else:
+                  count += 1
+                message_list.append(m)
+                uid_list_list.append(uid_list)
+                priority_list.append(line.priority)
+                if count >= MAX_GROUPED_OBJECTS:
+                  break
+
+        # Release locks before starting a potentially long calculation
+        get_transaction().commit()
+        # Try to invoke
+        if group_method_id is not None:
+          LOG('SQLDict', TRACE,
+              'invoking a group method %s with %d objects '\
+              ' (%d objects in expanded form)' % (
+              group_method_id, len(message_list), count))
+          activity_tool.invokeGroup(group_method_id, message_list)
+        else:
+          activity_tool.invoke(message_list[0])
+
+        # Check if messages are executed successfully.
+        # When some of them are executed successfully, it may not be acceptable to
+        # abort the transaction, because these remain pending, only due to other
+        # invalid messages. This means that a group method should not be used if
+        # it has a side effect. For now, only indexing uses a group method, and this
+        # has no side effect.
+        for m in message_list:
+          if m.is_executed:
+            break
+        else:
+          get_transaction().abort()
+
+        for i in xrange(len(message_list)):
+          m = message_list[i]
+          uid_list = uid_list_list[i]
+          priority = priority_list[i]
+          if m.is_executed:
+            activity_tool.SQLDict_delMessage(uid = uid_list)                # Delete it
+            get_transaction().commit()                                        # If successful, commit
+            if m.active_process:
+              active_process = activity_tool.unrestrictedTraverse(m.active_process)
+              if not active_process.hasActivity():
+                # No more activity
+                m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
+          else:
+            if type(m.exc_type) is ClassType and issubclass(m.exc_type, ConflictError):
+              # If this is a conflict error, do not lower the priority but only delay.
+              activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
+                                                retry = 1)
+              get_transaction().commit() # Release locks before starting a potentially long calculation
+            elif priority > MAX_PRIORITY:
+              # This is an error
+              if len(uid_list) > 0:
+                activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE)
+                                                                                # Assign message back to 'error' state
+              m.notifyUser(activity_tool)                                       # Notify Error
+              get_transaction().commit()                                        # and commit
             else:
-              count = 1
-
-            group_method = activity_tool.restrictedTraverse(group_method_id)
-            
-            if count < MAX_GROUPED_OBJECTS:
-              # Retrieve objects which have the same group method.
-              result = activity_tool.SQLDict_readMessage(processing_node = processing_node, priority = priority,
-                                                         to_date = now_date, group_method_id = group_method_id,
-                                                         order_validation_text = order_validation_text)
-              #LOG('SQLDict dequeueMessage', 0, 'result = %d' % (len(result)))
-              for line in result:
-                path = line.path
-                method_id = line.method_id
-                uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
-                                                             processing_node = None, to_date = now_date,
-                                                             order_validation_text = order_validation_text)
-                uid_list = [x.uid for x in uid_list]
-                if len(uid_list) > 0:
-                  # Set selected messages to processing
-                  activity_tool.SQLDict_processMessage(uid = uid_list)
+              # Lower priority
+              if len(uid_list) > 0:
+                activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
+                                                  priority = priority + 1, retry = 1)
                 get_transaction().commit() # Release locks before starting a potentially long calculation
-                m = self.loadMessage(line.message, uid = line.uid)
-                if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
-                  if m.hasExpandMethod():
-                    try:
-                      count += len(m.getObjectList(activity_tool))
-                    except:
-                      # Here, simply ignore an exception. The same exception should be handled later.
-                      LOG('SQLDict', 0, 'ignoring an exception from getObjectList', error=sys.exc_info())
-                      pass
-                  else:
-                    count += 1
-                  message_list.append(m)
-                  uid_list_list.append(uid_list)
-                  priority_list.append(line.priority)
-                  if count >= MAX_GROUPED_OBJECTS:
-                    break
-                
-          # Release locks before starting a potentially long calculation
-          get_transaction().commit()
-          # Try to invoke
-          if group_method_id is not None:
-            LOG('SQLDict', TRACE,
-                'invoking a group method %s with %d objects '\
-                ' (%d objects in expanded form)' % (
-                group_method_id, len(message_list), count))
-            activity_tool.invokeGroup(group_method_id, message_list)
-          else:
-            activity_tool.invoke(message_list[0]) 
-
-          # Check if messages are executed successfully.
-          # When some of them are executed successfully, it may not be acceptable to
-          # abort the transaction, because these remain pending, only due to other
-          # invalid messages. This means that a group method should not be used if
-          # it has a side effect. For now, only indexing uses a group method, and this
-          # has no side effect.
-          for m in message_list:
-            if m.is_executed:
-              break
-          else:
-            get_transaction().abort()
-            
-          for i in xrange(len(message_list)):
-            m = message_list[i]
-            uid_list = uid_list_list[i]
-            priority = priority_list[i]
-            if m.is_executed:
-              activity_tool.SQLDict_delMessage(uid = uid_list)                # Delete it
-              get_transaction().commit()                                        # If successful, commit
-              if m.active_process:
-                active_process = activity_tool.unrestrictedTraverse(m.active_process)
-                if not active_process.hasActivity():
-                  # No more activity
-                  m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
-            else:
-              if type(m.exc_type) is ClassType and issubclass(m.exc_type, ConflictError):
-                # If this is a conflict error, do not lower the priority but only delay.
-                activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
-                                                  retry = 1)
-                get_transaction().commit() # Release locks before starting a potentially long calculation
-              elif priority > MAX_PRIORITY:
-                # This is an error
-                if len(uid_list) > 0:
-                  activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE)
-                                                                                  # Assign message back to 'error' state
-                m.notifyUser(activity_tool)                                       # Notify Error
-                get_transaction().commit()                                        # and commit
-              else:
-                # Lower priority
-                if len(uid_list) > 0:
-                  activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
-                                                    priority = priority + 1, retry = 1)
-                  get_transaction().commit() # Release locks before starting a potentially long calculation
-                
-        return 0
-      get_transaction().commit() # Release locks before starting a potentially long calculation
+
+      return 0
+    get_transaction().commit() # Release locks before starting a potentially long calculation
     return 1
 
   def hasActivity(self, activity_tool, object, **kw):
-    if hasattr(activity_tool,'SQLDict_readMessageList'):
+    hasMessage = getattr(activity_tool, 'SQLDict_hasMessage', None)
+    if hasMessage is not None:
       if object is not None:
         my_object_path = '/'.join(object.getPhysicalPath())
-        result = activity_tool.SQLDict_hasMessage(path=my_object_path, **kw)
+        result = hasMessage(path=my_object_path, **kw)
         if len(result) > 0:
           return result[0].message_count > 0
       else:
@@ -369,7 +373,8 @@
     path = '/'.join(object_path)
     # LOG('Flush', 0, str((path, invoke, method_id)))
     method_dict = {}
-    if hasattr(activity_tool,'SQLDict_readMessageList'):
+    readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
+    if readMessageList is not None:
       # Parse each message in registered
       for m in activity_tool.getRegisteredMessageList(self):
         if list(m.object_path) == list(object_path) and (method_id is None or method_id == m.method_id):
@@ -391,8 +396,8 @@
                 raise ActivityFlushError, (
                     'The document %s does not exist' % path)
       # Parse each message in SQL dict
-      result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,
-                             processing_node=None,include_processing=0)
+      result = readMessageList(path=path, method_id=method_id,
+                               processing_node=None,include_processing=0)
       for line in result:
         path = line.path
         method_id = line.method_id
@@ -422,8 +427,10 @@
   def getMessageList(self, activity_tool, processing_node=None,include_processing=0,**kw):
     # YO: reading all lines might cause a deadlock
     message_list = []
-    if hasattr(activity_tool,'SQLDict_readMessageList'):
-      result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node=None, to_processing_date=None,include_processing=include_processing)
+    readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
+    if readMessageList is not None:
+      result = readMessageList(path=None, method_id=None, processing_node=None,
+                               to_processing_date=None,include_processing=include_processing)
       for line in result:
         m = self.loadMessage(line.message, uid = line.uid)
         m.processing_node = line.processing_node
@@ -435,8 +442,9 @@
   def dumpMessageList(self, activity_tool):
     # Dump all messages in the table.
     message_list = []
-    if hasattr(activity_tool, 'SQLDict_dumpMessageList'):
-      result = activity_tool.SQLDict_dumpMessageList()
+    dumpMessageList = getattr(activity_tool, 'SQLDict_dumpMessageList', None)
+    if dumpMessageList is not None:
+      result = dumpMessageList()
       for line in result:
         m = self.loadMessage(line.message, uid = line.uid)
         message_list.append(m)
@@ -444,7 +452,8 @@
 
   def distribute(self, activity_tool, node_count):
     processing_node = 1
-    if hasattr(activity_tool,'SQLDict_readMessageList'):
+    readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
+    if readMessageList is not None:
       now_date = DateTime()
       if (now_date - self.max_processing_date) > MAX_PROCESSING_TIME:
         # Sticky processing messages should be set back to non processing
@@ -452,9 +461,9 @@
         self.max_processing_date = now_date
       else:
         max_processing_date = None
-      result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node = -1,
-                                                     to_processing_date = max_processing_date,
-                                                     include_processing=0) # Only assign non assigned messages
+      result = readMessageList(path=None, method_id=None, processing_node = -1,
+                               to_processing_date = max_processing_date,
+                               include_processing=0) # Only assign non assigned messages
       get_transaction().commit() # Release locks before starting a potentially long calculation
       path_dict = {}
       for line in result:
@@ -534,7 +543,7 @@
     if result[0].uid_count > 0:
       return INVALID_ORDER
     return VALID
-    
+
   def _validate_after_tag_and_method_id(self, activity_tool, message, value):
     # Count number of occurances of tag and method_id
     if (type(value) != TupleType and type(value) != ListType) or len(value)<2:

Modified: erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLQueue.py?rev=8156&r1=8155&r2=8156&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLQueue.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLQueue.py Fri Jun 23 13:58:28 2006
@@ -74,72 +74,76 @@
     activity_tool.SQLQueue_delMessage(uid = m.uid)
 
   def dequeueMessage(self, activity_tool, processing_node):
-    if hasattr(activity_tool,'SQLQueue_readMessageList'):
-      now_date = DateTime()
-      # Next processing date in case of error
-      next_processing_date = now_date + float(VALIDATION_ERROR_DELAY)/86400
-      priority = random.choice(priority_weight)
-      # Try to find a message at given priority level
-      result = activity_tool.SQLQueue_readMessage(processing_node=processing_node, priority=priority,
-                                                  to_date=now_date)
-      if len(result) == 0:
-        # If empty, take any message
-        result = activity_tool.SQLQueue_readMessage(processing_node=processing_node, priority=None,to_date=now_date)
-      if len(result) > 0:
-        line = result[0]
-        path = line.path
-        method_id = line.method_id
-        # Make sure message can not be processed anylonger
-        activity_tool.SQLQueue_processMessage(uid=line.uid)
-        get_transaction().commit() # Release locks before starting a potentially long calculation
-        m = self.loadMessage(line.message)
-        # Make sure object exists
-        validation_state = m.validate(self, activity_tool)
-        if validation_state is not VALID:
-          if validation_state in (EXCEPTION, INVALID_PATH):
-            if line.priority > MAX_PRIORITY:
-              # This is an error
-              activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = VALIDATE_ERROR_STATE)
-                                                                                # Assign message back to 'error' state
-              #m.notifyUser(activity_tool)                                       # Notify Error
-              get_transaction().commit()                                        # and commit
-            else:
-              # Lower priority
-              activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1)
-              get_transaction().commit() # Release locks before starting a potentially long calculation
+    readMessage = getattr(activity_tool, 'SQLQueue_readMessage', None)
+    if readMessage is None:
+      return 1
+
+    now_date = DateTime()
+    # Next processing date in case of error
+    next_processing_date = now_date + float(VALIDATION_ERROR_DELAY)/86400
+    priority = random.choice(priority_weight)
+    # Try to find a message at given priority level
+    result = readMessage(processing_node=processing_node, priority=priority,
+                         to_date=now_date)
+    if len(result) == 0:
+      # If empty, take any message
+      result = readMessage(processing_node=processing_node, priority=None,to_date=now_date)
+    if len(result) > 0:
+      line = result[0]
+      path = line.path
+      method_id = line.method_id
+      # Make sure message can not be processed anylonger
+      activity_tool.SQLQueue_processMessage(uid=line.uid)
+      get_transaction().commit() # Release locks before starting a potentially long calculation
+      m = self.loadMessage(line.message)
+      # Make sure object exists
+      validation_state = m.validate(self, activity_tool)
+      if validation_state is not VALID:
+        if validation_state in (EXCEPTION, INVALID_PATH):
+          if line.priority > MAX_PRIORITY:
+            # This is an error
+            activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = VALIDATE_ERROR_STATE)
+                                                                              # Assign message back to 'error' state
+            #m.notifyUser(activity_tool)                                       # Notify Error
+            get_transaction().commit()                                        # and commit
           else:
-            # We do not lower priority for INVALID_ORDER errors but we do postpone execution
-            activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date,
-                                                priority = line.priority)
+            # Lower priority
+            activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1)
             get_transaction().commit() # Release locks before starting a potentially long calculation
         else:
-          # Try to invoke
-          activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ?
-          if m.is_executed:                                          # Make sure message could be invoked
-            activity_tool.SQLQueue_delMessage(uid=line.uid)  # Delete it
-            get_transaction().commit()                                        # If successful, commit
+          # We do not lower priority for INVALID_ORDER errors but we do postpone execution
+          activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date,
+                                              priority = line.priority)
+          get_transaction().commit() # Release locks before starting a potentially long calculation
+      else:
+        # Try to invoke
+        activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ?
+        if m.is_executed:                                          # Make sure message could be invoked
+          activity_tool.SQLQueue_delMessage(uid=line.uid)  # Delete it
+          get_transaction().commit()                                        # If successful, commit
+        else:
+          get_transaction().abort()                                         # If not, abort transaction and start a new one
+          if line.priority > MAX_PRIORITY:
+            # This is an error
+            activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = INVOKE_ERROR_STATE)
+                                                                              # Assign message back to 'error' state
+            m.notifyUser(activity_tool)                                       # Notify Error
+            get_transaction().commit()                                        # and commit
           else:
-            get_transaction().abort()                                         # If not, abort transaction and start a new one
-            if line.priority > MAX_PRIORITY:
-              # This is an error
-              activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = INVOKE_ERROR_STATE)
-                                                                                # Assign message back to 'error' state
-              m.notifyUser(activity_tool)                                       # Notify Error
-              get_transaction().commit()                                        # and commit
-            else:
-              # Lower priority
-              activity_tool.SQLQueue_setPriority(uid=line.uid, date = next_processing_date,
-                                                 priority = line.priority + 1)
-              get_transaction().commit() # Release locks before starting a potentially long calculation
-        return 0
-      get_transaction().commit() # Release locks before starting a potentially long calculation
+            # Lower priority
+            activity_tool.SQLQueue_setPriority(uid=line.uid, date = next_processing_date,
+                                                priority = line.priority + 1)
+            get_transaction().commit() # Release locks before starting a potentially long calculation
+      return 0
+    get_transaction().commit() # Release locks before starting a potentially long calculation
     return 1
 
   def hasActivity(self, activity_tool, object, **kw):
-    if hasattr(activity_tool,'SQLQueue_readMessageList'):
+    hasMessage = getattr(activity_tool, 'SQLQueue_hasMessage', None)
+    if hasMessage is not None:
       if object is not None:
         my_object_path = '/'.join(object.getPhysicalPath())
-        result = activity_tool.SQLQueue_hasMessage(path=my_object_path, **kw)
+        result = hasMessage(path=my_object_path, **kw)
         if len(result) > 0:
           return result[0].message_count > 0
       else:
@@ -158,7 +162,8 @@
 
       NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible
     """
-    if hasattr(activity_tool,'SQLQueue_readMessageList'):
+    readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None)
+    if readMessageList is not None:
       #return # Do nothing here to precent overlocking
       path = '/'.join(object_path)
       # Parse each message in registered
@@ -168,7 +173,7 @@
           activity_tool.unregisterMessage(self, m)
       # Parse each message in SQL queue
       #LOG('Flush', 0, str((path, invoke, method_id)))
-      result = activity_tool.SQLQueue_readMessageList(path=path, method_id=method_id,processing_node=None)
+      result = readMessageList(path=path, method_id=method_id,processing_node=None)
       #LOG('Flush', 0, str(len(result)))
       method_dict = {}
       for line in result:
@@ -202,8 +207,9 @@
 
   def getMessageList(self, activity_tool, processing_node=None,**kw):
     message_list = []
-    if hasattr(activity_tool,'SQLQueue_readMessageList'):
-      result = activity_tool.SQLQueue_readMessageList(path=None, method_id=None, processing_node=None)
+    readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None)
+    if readMessageList is not None:
+      result = readMessageList(path=None, method_id=None, processing_node=None)
       for line in result:
         m = self.loadMessage(line.message)
         m.processing_node = line.processing_node
@@ -214,17 +220,19 @@
   def dumpMessageList(self, activity_tool):
     # Dump all messages in the table.
     message_list = []
-    if hasattr(activity_tool, 'SQLQueue_dumpMessageList'):
-      result = activity_tool.SQLQueue_dumpMessageList()
+    dumpMessageList = getattr(activity_tool, 'SQLQueue_dumpMessageList', None)
+    if dumpMessageList is not None:
+      result = dumpMessageList()
       for line in result:
         m = self.loadMessage(line.message, uid = line.uid)
         message_list.append(m)
     return message_list
-    
+
   def distribute(self, activity_tool, node_count):
     processing_node = 1
-    if hasattr(activity_tool,'SQLQueue_readMessageList'):
-      result = activity_tool.SQLQueue_readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages
+    readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None)
+    if readMessageList is not None:
+      result = readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages
       #LOG('distribute count',0,str(len(result)) )
       #LOG('distribute count',0,str(map(lambda x:x.uid, result)))
       #get_transaction().commit() # Release locks before starting a potentially long calculation
@@ -296,7 +304,7 @@
     if result[0].uid_count > 0:
       return INVALID_ORDER
     return VALID
-    
+
   def _validate_after_tag(self, activity_tool, message, value):
     # Count number of occurances of tag
     if type(value) == type(''):
@@ -305,7 +313,7 @@
     if result[0].uid_count > 0:
       return INVALID_ORDER
     return VALID
-    
+
   def _validate_after_tag_and_method_id(self, activity_tool, message, value):
     # Count number of occurances of tag and method_id
     if (type(value) != type ( (0,) ) and type(value) != type([])) or len(value)<2:
@@ -321,7 +329,7 @@
     if result[0].uid_count > 0:
       return INVALID_ORDER
     return VALID
-  
+
   # Required for tests (time shift)
   def timeShift(self, activity_tool, delay, processing_node = None):
     """




More information about the Erp5-report mailing list