[Erp5-report] r17102 - in /erp5/trunk/products/CMFActivity: ActivityBuffer.py ActivityTool.py

nobody at svn.erp5.org nobody at svn.erp5.org
Mon Oct 22 18:30:51 CEST 2007


Author: vincent
Date: Mon Oct 22 18:30:50 2007
New Revision: 17102

URL: http://svn.erp5.org?rev=17102&view=rev
Log:
Remove usage from volatile parameters to access the ActivityBuffer on ActivityTool.
 - implies removing the global dict used in ActivityBuffer to avoid duplicated activity insertion.
 - implies that ActivityBuffer is boud to a thread (because of get_ident) and not to a connection any more (because volatile is bound to a connection), so persistent objects must not be held outside transaction scope (the maximum scope at which a connection is bound to a thread)
   - implies modifying hook registration so that portal_activities is passed as a parameter to tpc_prepare
     - implies overloading partially TM._register method to take activity_tool as a parameter

Modified:
    erp5/trunk/products/CMFActivity/ActivityBuffer.py
    erp5/trunk/products/CMFActivity/ActivityTool.py

Modified: erp5/trunk/products/CMFActivity/ActivityBuffer.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/ActivityBuffer.py?rev=17102&r1=17101&r2=17102&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/ActivityBuffer.py (original)
+++ erp5/trunk/products/CMFActivity/ActivityBuffer.py Mon Oct 22 18:30:50 2007
@@ -37,16 +37,6 @@
 if not hasattr(globals()['__builtins__'], 'set'):
   from sets import Set as set
 
-# This variable is used to store thread-local buffered information.
-# This must be RAM-based, because the use of a volatile attribute does
-# not guarantee that the information persists until the end of a
-# transaction, but we need to assure that the information is accessible
-# for flushing activities. So the approach here is that information is
-# stored in RAM, and removed at _finish and _abort, so that the information
-# would not span over transactions.
-buffer_dict_lock = threading.Lock()
-buffer_dict = {}
-
 class ActivityBuffer(TM):
 
   _p_oid=_p_changed=_registered=None
@@ -54,28 +44,8 @@
   def __init__(self, activity_tool=None):
     self.requires_prepare = 0
 
-    # Directly store the activity tool as an attribute. At the beginning
-    # the activity tool was stored as a part of the key in queued_activity and
-    # in flushed_activity, but this is not nice because in that case we must
-    # use hash on it, and when there is no uid on activity tool, it is
-    # impossible to generate a new uid because acquisition is not available
-    # in the dictionary.
-    assert activity_tool is not None
-    self._activity_tool = activity_tool
-
-    # Referring to a persistent object is dangerous when finishing a transaction,
-    # so store only the required information.
-    self._activity_tool_path = activity_tool.getPhysicalPath()
-
-    try:
-      buffer_dict_lock.acquire()
-      if self._activity_tool_path not in buffer_dict:
-        buffer_dict[self._activity_tool_path] = threading.local()
-    finally:
-      buffer_dict_lock.release()
-
   def _getBuffer(self):
-    buffer = buffer_dict[self._activity_tool_path]
+    buffer = self
     # Create attributes only if they are not present.
     if not hasattr(buffer, 'queued_activity'):
       buffer.queued_activity = []
@@ -99,9 +69,13 @@
     buffer = self._getBuffer()
     return buffer.uid_set_dict.setdefault(activity, set())
 
+  def _register(self, activity_tool):
+    self._beginAndHook(activity_tool)
+    TM._register(self)
+
   # Keeps a list of messages to add and remove
   # at end of transaction
-  def _begin(self, *ignored):
+  def _beginAndHook(self, activity_tool):
     # LOG('ActivityBuffer', 0, '_begin %r' % (self,))
     from ActivityTool import activity_list
     self.requires_prepare = 1
@@ -115,7 +89,7 @@
       # patching Trasaction.
       transaction = get_transaction()
       try:
-        transaction.beforeCommitHook(self.tpc_prepare, transaction)
+        transaction.beforeCommitHook(self.tpc_prepare, transaction, activity_tool=activity_tool)
       except AttributeError:
         pass
     except:
@@ -143,7 +117,9 @@
   def _abort(self, *ignored):
     self._clearBuffer()
 
-  def tpc_prepare(self, transaction, sub=None):
+  def tpc_prepare(self, transaction, sub=None, activity_tool=None):
+    assert activity_tool is not None
+    self._activity_tool_path = activity_tool.getPhysicalPath()
     # Do nothing if it is a subtransaction
     if sub is not None:
       return
@@ -156,23 +132,23 @@
       # Try to push / delete all messages
       buffer = self._getBuffer()
       for (activity, message) in buffer.flushed_activity:
-        activity.prepareDeleteMessage(self._activity_tool, message)
+        activity.prepareDeleteMessage(activity_tool, message)
       activity_dict = {}
       for (activity, message) in buffer.queued_activity:
         activity_dict.setdefault(activity, []).append(message)
       for activity, message_list in activity_dict.iteritems():
         if hasattr(activity, 'prepareQueueMessageList'):
-          activity.prepareQueueMessageList(self._activity_tool, message_list)
+          activity.prepareQueueMessageList(activity_tool, message_list)
         else:
           for message in message_list:
-            activity.prepareQueueMessage(self._activity_tool, message)
+            activity.prepareQueueMessage(activity_tool, message)
     except:
       LOG('ActivityBuffer', ERROR, "exception during tpc_prepare",
           error=sys.exc_info())
       raise
 
   def deferredQueueMessage(self, activity_tool, activity, message):
-    self._register()
+    self._register(activity_tool)
     # Activity is called to prevent queuing some messages (useful for example
     # to prevent reindexing objects multiple times)
     if not activity.isMessageRegistered(self, activity_tool, message):
@@ -183,7 +159,7 @@
       activity.registerMessage(self, activity_tool, message)
 
   def deferredDeleteMessage(self, activity_tool, activity, message):
-    self._register()
+    self._register(activity_tool)
     buffer = self._getBuffer()
     buffer.flushed_activity.append((activity, message))
 

Modified: erp5/trunk/products/CMFActivity/ActivityTool.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/ActivityTool.py?rev=17102&r1=17101&r2=17102&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/ActivityTool.py (original)
+++ erp5/trunk/products/CMFActivity/ActivityTool.py Mon Oct 22 18:30:50 2007
@@ -80,6 +80,13 @@
 # Activity Registration
 activity_dict = {}
 activity_list = []
+
+# Here go ActivityBuffer instances
+# Structure:
+#  global_activity_buffer[activity_tool_path][thread_id] = ActivityBuffer
+global_activity_buffer = {}
+from thread import get_ident, allocate_lock
+global_activity_buffer_lock = allocate_lock()
 
 def registerActivity(activity):
   # Must be rewritten to register
@@ -680,40 +687,76 @@
           return 1
       return 0
 
+    def getActivityBuffer(self, create_if_not_found=True):
+      """
+        Get activtity buffer for this thread for this activity tool.
+        If no activity buffer is found at lowest level and create_if_not_found
+        is True, create one.
+        Intermediate level is unconditionaly created if non existant because
+        chances are it will be used in the instance life.
+        Lock is held when checking for intermediate level existance
+        because:
+         - intermediate level dict must not be created in 2 threads at the
+           same time, since one creation would destroy the existing one.
+        It's released after that step because:
+         - lower level access is at thread scope, thus by definition there
+           can be only one access at a time to a key
+         - GIL protects us when accessing python instances
+      """
+      global global_activity_buffer
+      global global_activity_buffer_lock
+      assert getattr(self, 'aq_self', None) is not None
+      my_instance_key = self.getPhysicalPath()
+      my_thread_key = get_ident()
+      global_activity_buffer_lock.acquire()
+      try:
+        if my_instance_key not in global_activity_buffer:
+          global_activity_buffer[my_instance_key] = {}
+      finally:
+        global_activity_buffer_lock.release()
+      thread_activity_buffer = global_activity_buffer[my_instance_key]
+      if my_thread_key not in thread_activity_buffer:
+        if create_if_not_found:
+          buffer = ActivityBuffer(activity_tool=self)
+        else:
+          buffer = None
+        thread_activity_buffer[my_thread_key] = buffer
+      activity_buffer = thread_activity_buffer[my_thread_key]
+      return activity_buffer
+
     security.declarePrivate('activateObject')
     def activateObject(self, object, activity, active_process, **kw):
       global is_initialized
       if not is_initialized: self.initialize()
-      if getattr(self, '_v_activity_buffer', None) is None:
-        self._v_activity_buffer = ActivityBuffer(activity_tool=self)
+      self.getActivityBuffer()
       return ActiveWrapper(object, activity, active_process, **kw)
 
     def deferredQueueMessage(self, activity, message):
-      self._v_activity_buffer.deferredQueueMessage(self, activity, message)
+      activity_buffer = self.getActivityBuffer()
+      activity_buffer.deferredQueueMessage(self, activity, message)
 
     def deferredDeleteMessage(self, activity, message):
-      if getattr(self, '_v_activity_buffer', None) is None:
-        self._v_activity_buffer = ActivityBuffer(activity_tool=self)
-      self._v_activity_buffer.deferredDeleteMessage(self, activity, message)
+      activity_buffer = self.getActivityBuffer()
+      activity_buffer.deferredDeleteMessage(self, activity, message)
 
     def getRegisteredMessageList(self, activity):
-      activity_buffer = getattr(self, '_v_activity_buffer', None)
+      activity_buffer = self.getActivityBuffer(create_if_not_found=False)
       if activity_buffer is not None:
-        activity_buffer._register() # This is required if flush flush is called outside activate
-        return activity.getRegisteredMessageList(self._v_activity_buffer,
+        #activity_buffer._register() # This is required if flush flush is called outside activate
+        return activity.getRegisteredMessageList(activity_buffer,
                                                  aq_inner(self))
       else:
         return []
 
     def unregisterMessage(self, activity, message):
-      self._v_activity_buffer._register() # Required if called by flush, outside activate
-      return activity.unregisterMessage(self._v_activity_buffer, aq_inner(self), message)
+      activity_buffer = self.getActivityBuffer()
+      #activity_buffer._register()
+      return activity.unregisterMessage(activity_buffer, aq_inner(self), message)
 
     def flush(self, obj, invoke=0, **kw):
       global is_initialized
       if not is_initialized: self.initialize()
-      if getattr(self, '_v_activity_buffer', None) is None:
-        self._v_activity_buffer = ActivityBuffer(activity_tool=self)
+      self.getActivityBuffer()
       if isinstance(obj, tuple):
         object_path = obj
       else:
@@ -840,8 +883,7 @@
       # Some Security Cheking should be made here XXX
       global is_initialized
       if not is_initialized: self.initialize()
-      if getattr(self, '_v_activity_buffer', None) is None:
-        self._v_activity_buffer = ActivityBuffer(activity_tool=self)
+      self.getActivityBuffer()
       activity_dict[activity].queueMessage(aq_inner(self),
         Message(path, active_process, activity_kw, method_id, args, kw))
 




More information about the Erp5-report mailing list