[Erp5-report] r13188 - in /erp5/trunk/products/CMFActivity: ./ Activity/ skins/activity/

nobody at svn.erp5.org nobody at svn.erp5.org
Fri Mar 2 15:48:51 CET 2007


Author: yo
Date: Fri Mar  2 15:48:48 2007
New Revision: 13188

URL: http://svn.erp5.org?rev=13188&view=rev
Log:
Use RAM-based approach for ActivityBuffer instead of a volatile attribute. Clean up some mess.

Modified:
    erp5/trunk/products/CMFActivity/ActiveObject.py
    erp5/trunk/products/CMFActivity/Activity/Queue.py
    erp5/trunk/products/CMFActivity/Activity/RAMDict.py
    erp5/trunk/products/CMFActivity/Activity/SQLDict.py
    erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
    erp5/trunk/products/CMFActivity/ActivityBuffer.py
    erp5/trunk/products/CMFActivity/ActivityTool.py
    erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_delMessage.zsql

Modified: erp5/trunk/products/CMFActivity/ActiveObject.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/ActiveObject.py?rev=13188&r1=13187&r2=13188&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/ActiveObject.py (original)
+++ erp5/trunk/products/CMFActivity/ActiveObject.py Fri Mar  2 15:48:48 2007
@@ -134,9 +134,9 @@
   def recursiveFlushActivity(self, invoke=0, **kw):
     # flush all activities related to this object
     self.flushActivity(invoke=invoke, **kw)
-    if hasattr(aq_base(self), 'objectValues'):
+    if getattr(aq_base(self), 'objectValues', None) is not None:
       for o in self.objectValues():
-        if hasattr(aq_base(o), 'recursiveFlushActivity'):
+        if getattr(aq_base(o), 'recursiveFlushActivity', None) is not None:
           o.recursiveFlushActivity(invoke=invoke, **kw)
 
   security.declareProtected( permissions.View, 'hasActivity' )

Modified: erp5/trunk/products/CMFActivity/Activity/Queue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/Queue.py?rev=13188&r1=13187&r2=13188&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/Queue.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/Queue.py Fri Mar  2 15:48:48 2007
@@ -143,7 +143,7 @@
                             going to be executed                                                        
     """
     try:
-      if activity_tool.unrestrictedTraverse(message.object_path) is None:
+      if activity_tool.unrestrictedTraverse(message.object_path, None) is None:
         # Do not try to call methods on objects which do not exist
         LOG('WARNING ActivityTool', 0,
            'Object %s does not exist' % '/'.join(message.object_path))
@@ -214,27 +214,23 @@
 
   # Registration Management
   def registerActivityBuffer(self, activity_buffer):
-    class_name = self.__class__.__name__
-    setattr(activity_buffer, '_%s_message_list' % class_name, [])  
+    pass
 
   def isMessageRegistered(self, activity_buffer, activity_tool, m):
-    class_name = self.__class__.__name__
-    return m in getattr(activity_buffer, '_%s_message_list' % class_name)
+    message_list = activity_buffer.getMessageList(self)
+    return m in message_list
 
   def registerMessage(self, activity_buffer, activity_tool, m):
-    class_name = self.__class__.__name__
-    getattr(activity_buffer, '_%s_message_list' % class_name).append(m)
+    message_list = activity_buffer.getMessageList(self)
+    message_list.append(m)
     m.is_registered = 1
 
   def unregisterMessage(self, activity_buffer, activity_tool, m):
     m.is_registered = 0
 
   def getRegisteredMessageList(self, activity_buffer, activity_tool):
-    class_name = self.__class__.__name__
-    if hasattr(activity_buffer, '_%s_message_list' % class_name):
-      return filter(lambda m: m.is_registered, getattr(activity_buffer, '_%s_message_list' % class_name))
-    else:
-      return ()
+    message_list = activity_buffer.getMessageList(self)
+    return [m for m in message_list if m.is_registered]
 
   # Required for tests (time shift)
   def timeShift(self, activity_tool, delay):

Modified: erp5/trunk/products/CMFActivity/Activity/RAMDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/RAMDict.py?rev=13188&r1=13187&r2=13188&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/RAMDict.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/RAMDict.py Fri Mar  2 15:48:48 2007
@@ -63,18 +63,17 @@
         del self.getDict(activity_tool_path)[(tuple(m.object_path), m.method_id)]
 
   def registerActivityBuffer(self, activity_buffer):
-    class_name = self.__class__.__name__
-    setattr(activity_buffer, '_%s_message_list' % class_name, [])
-    setattr(activity_buffer, '_%s_uid_dict' % class_name, {})
+    pass
 
   def isMessageRegistered(self, activity_buffer, activity_tool, m):
-    class_name = self.__class__.__name__
-    return getattr(activity_buffer, '_%s_uid_dict' % class_name).has_key((tuple(m.object_path), m.method_id))
+    uid_set = activity_buffer.getUidSet(self)
+    return (tuple(m.object_path), m.method_id) in uid_set
 
   def registerMessage(self, activity_buffer, activity_tool, m):
-    class_name = self.__class__.__name__
-    getattr(activity_buffer, '_%s_message_list' % class_name).append(m)
-    getattr(activity_buffer, '_%s_uid_dict' % class_name)[(tuple(m.object_path), m.method_id)] = 1
+    message_list = activity_buffer.getMessageList(self)
+    message_list.append(m)
+    uid_set = activity_buffer.getUidSet(self)
+    uid_set.add((tuple(m.object_path), m.method_id))
     m.is_registered = 1
 
   def dequeueMessage(self, activity_tool, processing_node):

Modified: erp5/trunk/products/CMFActivity/Activity/SQLDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLDict.py?rev=13188&r1=13187&r2=13188&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLDict.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLDict.py Fri Mar  2 15:48:48 2007
@@ -116,36 +116,28 @@
 
   # Registration management
   def registerActivityBuffer(self, activity_buffer):
-    class_name = self.__class__.__name__
-    setattr(activity_buffer, '_%s_uid_dict' % class_name, {})
-    setattr(activity_buffer, '_%s_message_list' % class_name, [])
+    pass
 
   def isMessageRegistered(self, activity_buffer, activity_tool, m):
-    class_name = self.__class__.__name__
-    uid_dict = getattr(activity_buffer,'_%s_uid_dict' % class_name)
-    return uid_dict.has_key((tuple(m.object_path), m.method_id))
+    uid_set = activity_buffer.getUidSet(self)
+    return (tuple(m.object_path), m.method_id) in uid_set
 
   def registerMessage(self, activity_buffer, activity_tool, m):
     m.is_registered = 1
-    class_name = self.__class__.__name__
-    uid_dict = getattr(activity_buffer,'_%s_uid_dict' % class_name)
-    uid_dict[(tuple(m.object_path), m.method_id)] = 1
-    getattr(activity_buffer,'_%s_message_list' % class_name).append(m)
+    uid_set = activity_buffer.getUidSet(self)
+    uid_set.add((tuple(m.object_path), m.method_id))
+    message_list = activity_buffer.getMessageList(self)
+    message_list.append(m)
 
   def unregisterMessage(self, activity_buffer, activity_tool, m):
     m.is_registered = 0 # This prevents from inserting deleted messages into the queue
     class_name = self.__class__.__name__
-    uid_dict = getattr(activity_buffer,'_%s_uid_dict' % class_name)
-    if uid_dict.has_key((tuple(m.object_path), m.method_id)):
-      del uid_dict[(tuple(m.object_path), m.method_id)]
+    uid_set = activity_buffer.getUidSet(self)
+    uid_set.discard((tuple(m.object_path), m.method_id))
 
   def getRegisteredMessageList(self, activity_buffer, activity_tool):
-    class_name = self.__class__.__name__
-    if hasattr(activity_buffer,'_%s_message_list' % class_name):
-      message_list = getattr(activity_buffer,'_%s_message_list' % class_name)
-      return [m for m in message_list if m.is_registered]
-    else:
-      return ()
+    message_list = activity_buffer.getMessageList(self)
+    return [m for m in message_list if m.is_registered]
 
   def getOrderValidationText(self, message):
     # Return an identifier of validators related to ordering.
@@ -456,7 +448,9 @@
             else:
               raise ActivityFlushError, (
                   'Could not validate %s on %s' % (m.method_id , path))
-          self.deleteMessage(activity_tool, m)
+
+      if len(result):
+        activity_tool.SQLDict_delMessage(uid = [line.uid for line in result])
 
   def getMessageList(self, activity_tool, processing_node=None,include_processing=0,**kw):
     # YO: reading all lines might cause a deadlock

Modified: erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLQueue.py?rev=13188&r1=13187&r2=13188&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLQueue.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLQueue.py Fri Mar  2 15:48:48 2007
@@ -72,7 +72,7 @@
   def prepareDeleteMessage(self, activity_tool, m):
     # Erase all messages in a single transaction
     #LOG("prepareDeleteMessage", 0, str(m.__dict__))
-    activity_tool.SQLQueue_delMessage(uid = m.uid)
+    activity_tool.SQLQueue_delMessage(uid = [m.uid])
 
   def dequeueMessage(self, activity_tool, processing_node):
     readMessage = getattr(activity_tool, 'SQLQueue_readMessage', None)
@@ -147,7 +147,7 @@
 
 
       if m.is_executed:
-        activity_tool.SQLQueue_delMessage(uid=line.uid)  # Delete it
+        activity_tool.SQLQueue_delMessage(uid=[line.uid])  # Delete it
       else:
         try:
           # If not, abort transaction and start a new one
@@ -234,7 +234,9 @@
               # The message no longer exists
               raise ActivityFlushError, (
                   'The document %s does not exist' % path)
-          self.deleteMessage(activity_tool, m)
+
+      if len(result):
+        activity_tool.SQLQueue_delMessage(uid = [line.uid for line in result])
 
   # def start(self, activity_tool, active_process=None):
   #   uid_list = activity_tool.SQLQueue_readUidList(path=path, active_process=active_process)

Modified: erp5/trunk/products/CMFActivity/ActivityBuffer.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/ActivityBuffer.py?rev=13188&r1=13187&r2=13188&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/ActivityBuffer.py (original)
+++ erp5/trunk/products/CMFActivity/ActivityBuffer.py Fri Mar  2 15:48:48 2007
@@ -26,23 +26,29 @@
 from Shared.DC.ZRDB.TM import TM
 from zLOG import LOG, ERROR, INFO
 import sys
-from thread import allocate_lock, get_ident
+import threading
 
 try:
   from transaction import get as get_transaction
 except ImportError:
   pass
 
+# This variable is used to store thread-local buffered information.
+# This must be RAM-based, because the use of a volatile attribute does
+# not guarantee that the information persists until the end of a
+# transaction, but we need to assure that the information is accessible
+# for flushing activities. So the approach here is that information is
+# stored in RAM, and removed at _finish and _abort, so that the information
+# would not span over transactions.
+buffer_dict_lock = threading.Lock()
+buffer_dict = {}
+
 class ActivityBuffer(TM):
 
   _p_oid=_p_changed=_registered=None
 
   def __init__(self, activity_tool=None):
-    self._use_TM = self._transactions = 1
-    if self._use_TM:
-      self._tlock = allocate_lock()
-      self._tthread = None
-    self._lock = allocate_lock()
+    self.requires_prepare = 0
 
     # Directly store the activity tool as an attribute. At the beginning
     # the activity tool was stored as a part of the key in queued_activity and
@@ -57,18 +63,51 @@
     # so store only the required information.
     self._activity_tool_path = activity_tool.getPhysicalPath()
 
+    try:
+      buffer_dict_lock.acquire()
+      if self._activity_tool_path not in buffer_dict:
+        buffer_dict[self._activity_tool_path] = threading.local()
+    finally:
+      buffer_dict_lock.release()
+
+    # Create attributes only if they are not present.
+    buffer = self._getBuffer()
+    if not hasattr(buffer, 'queued_activity'):
+      buffer.queued_activity = []
+      buffer.flushed_activity = []
+      buffer.message_list_dict = {}
+      buffer.uid_set_dict = {}
+
+  def _getBuffer(self):
+    return buffer_dict[self._activity_tool_path]
+
+  def _clearBuffer(self):
+    buffer = self._getBuffer()
+    del buffer.queued_activity[:]
+    del buffer.flushed_activity[:]
+    buffer.message_list_dict.clear()
+    buffer.uid_set_dict.clear()
+
+  def getMessageList(self, activity):
+    buffer = self._getBuffer()
+    return buffer.message_list_dict.setdefault(activity, []) 
+
+  def getUidSet(self, activity):
+    buffer = self._getBuffer()
+    return buffer.uid_set_dict.setdefault(activity, set())
+
   # Keeps a list of messages to add and remove
   # at end of transaction
   def _begin(self, *ignored):
+    LOG('ActivityBuffer', 0, '_begin %r' % (self,))
     from ActivityTool import activity_list
-    self._tlock.acquire()
-    self._tthread = get_ident()
     self.requires_prepare = 1
     try:
-      self.queued_activity = []
-      self.flushed_activity = []
-      for activity in activity_list:              # Reset registration for each transaction
+
+      # Reset registration for each transaction.
+      for activity in activity_list:
         activity.registerActivityBuffer(self)
+
       # In Zope 2.8 (ZODB 3.4), use beforeCommitHook instead of
       # patching Trasaction.
       transaction = get_transaction()
@@ -79,56 +118,46 @@
     except:
       LOG('ActivityBuffer', ERROR, "exception during _begin",
           error=sys.exc_info())
-      self._tlock.release()
       raise
 
   def _finish(self, *ignored):
-    if not self._tlock.locked() or self._tthread != get_ident():
-      LOG('ActivityBuffer', INFO, "ignoring _finish")
-      return
+    LOG('ActivityBuffer', 0, '_finish %r' % (self,))
     try:
       try:
         # Try to push / delete all messages
-        for (activity, message) in self.flushed_activity:
-          #LOG('ActivityBuffer finishDeleteMessage', ERROR, str(message.method_id))
+        buffer = self._getBuffer()
+        for (activity, message) in buffer.flushed_activity:
           activity.finishDeleteMessage(self._activity_tool_path, message)
-        for (activity, message) in self.queued_activity:
-          #LOG('ActivityBuffer finishQueueMessage', ERROR, str(message.method_id))
+        for (activity, message) in buffer.queued_activity:
           activity.finishQueueMessage(self._activity_tool_path, message)
       except:
         LOG('ActivityBuffer', ERROR, "exception during _finish",
             error=sys.exc_info())
         raise
     finally:
-      self._tlock.release()
+      self._clearBuffer()
 
   def _abort(self, *ignored):
-    if not self._tlock.locked() or self._tthread != get_ident():
-      LOG('ActivityBuffer', 0, "ignoring _abort")
-      return
-    self._tlock.release()
+    self._clearBuffer()
 
   def tpc_prepare(self, transaction, sub=None):
-    if sub is not None: # Do nothing if it is a subtransaction
+    # Do nothing if it is a subtransaction
+    if sub is not None:
       return
-    if not self.requires_prepare: return
+
+    if not self.requires_prepare:
+      return
+
     self.requires_prepare = 0
-    if not self._tlock.locked() or self._tthread != get_ident():
-      LOG('ActivityBuffer', 0, "ignoring tpc_prepare")
-      return
     try:
       # Try to push / delete all messages
-      for (activity, message) in self.flushed_activity:
-        #LOG('ActivityBuffer prepareDeleteMessage', ERROR, str(message.method_id))
+      buffer = self._getBuffer()
+      for (activity, message) in buffer.flushed_activity:
         activity.prepareDeleteMessage(self._activity_tool, message)
       activity_dict = {}
-      for (activity, message) in self.queued_activity:
-        key = activity
-        if key not in activity_dict:
-          activity_dict[key] = []
-        activity_dict[key].append(message)
-      for key, message_list in activity_dict.items():
-        activity = key
+      for (activity, message) in buffer.queued_activity:
+        activity_dict.setdefault(activity, []).append(message)
+      for activity, message_list in activity_dict.iteritems():
         if hasattr(activity, 'prepareQueueMessageList'):
           activity.prepareQueueMessageList(self._activity_tool, message_list)
         else:
@@ -144,11 +173,13 @@
     # Activity is called to prevent queuing some messages (useful for example
     # to prevent reindexing objects multiple times)
     if not activity.isMessageRegistered(self, activity_tool, message):
-      self.queued_activity.append((activity, message))
+      buffer = self._getBuffer()
+      buffer.queued_activity.append((activity, message))
       # We register queued messages so that we can
       # unregister them
       activity.registerMessage(self, activity_tool, message)
 
   def deferredDeleteMessage(self, activity_tool, activity, message):
     self._register()
-    self.flushed_activity.append((activity, message))
+    buffer = self._getBuffer()
+    buffer.flushed_activity.append((activity, message))

Modified: erp5/trunk/products/CMFActivity/ActivityTool.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/ActivityTool.py?rev=13188&r1=13187&r2=13188&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/ActivityTool.py (original)
+++ erp5/trunk/products/CMFActivity/ActivityTool.py Fri Mar  2 15:48:48 2007
@@ -641,15 +641,15 @@
       self._v_activity_buffer._register() # Required if called by flush, outside activate
       return activity.unregisterMessage(self._v_activity_buffer, self, message)
 
-    def flush(self, object, invoke=0, **kw):
+    def flush(self, obj, invoke=0, **kw):
       global is_initialized
       if not is_initialized: self.initialize()
       if getattr(self, '_v_activity_buffer', None) is None:
         self._v_activity_buffer = ActivityBuffer(activity_tool=self)
-      if type(object) is TupleType:
-        object_path = object
+      if isinstance(obj, tuple):
+        object_path = obj
       else:
-        object_path = object.getPhysicalPath()
+        object_path = obj.getPhysicalPath()
       for activity in activity_list:
         activity.flush(self, object_path, invoke=invoke, **kw)
 

Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_delMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_delMessage.zsql?rev=13188&r1=13187&r2=13188&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_delMessage.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_delMessage.zsql Fri Mar  2 15:48:48 2007
@@ -11,4 +11,5 @@
 DELETE FROM
   message_queue
 WHERE
-  uid = <dtml-sqlvar uid type="int">
+<dtml-in uid>uid = <dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else> OR </dtml-if>
+</dtml-in>




More information about the Erp5-report mailing list