[Erp5-report] r17102 - in /erp5/trunk/products/CMFActivity: ActivityBuffer.py ActivityTool.py
nobody at svn.erp5.org
nobody at svn.erp5.org
Mon Oct 22 18:30:51 CEST 2007
Author: vincent
Date: Mon Oct 22 18:30:50 2007
New Revision: 17102
URL: http://svn.erp5.org?rev=17102&view=rev
Log:
Remove usage from volatile parameters to access the ActivityBuffer on ActivityTool.
- implies removing the global dict used in ActivityBuffer to avoid duplicated activity insertion.
- implies that ActivityBuffer is boud to a thread (because of get_ident) and not to a connection any more (because volatile is bound to a connection), so persistent objects must not be held outside transaction scope (the maximum scope at which a connection is bound to a thread)
- implies modifying hook registration so that portal_activities is passed as a parameter to tpc_prepare
- implies overloading partially TM._register method to take activity_tool as a parameter
Modified:
erp5/trunk/products/CMFActivity/ActivityBuffer.py
erp5/trunk/products/CMFActivity/ActivityTool.py
Modified: erp5/trunk/products/CMFActivity/ActivityBuffer.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/ActivityBuffer.py?rev=17102&r1=17101&r2=17102&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/ActivityBuffer.py (original)
+++ erp5/trunk/products/CMFActivity/ActivityBuffer.py Mon Oct 22 18:30:50 2007
@@ -37,16 +37,6 @@
if not hasattr(globals()['__builtins__'], 'set'):
from sets import Set as set
-# This variable is used to store thread-local buffered information.
-# This must be RAM-based, because the use of a volatile attribute does
-# not guarantee that the information persists until the end of a
-# transaction, but we need to assure that the information is accessible
-# for flushing activities. So the approach here is that information is
-# stored in RAM, and removed at _finish and _abort, so that the information
-# would not span over transactions.
-buffer_dict_lock = threading.Lock()
-buffer_dict = {}
-
class ActivityBuffer(TM):
_p_oid=_p_changed=_registered=None
@@ -54,28 +44,8 @@
def __init__(self, activity_tool=None):
self.requires_prepare = 0
- # Directly store the activity tool as an attribute. At the beginning
- # the activity tool was stored as a part of the key in queued_activity and
- # in flushed_activity, but this is not nice because in that case we must
- # use hash on it, and when there is no uid on activity tool, it is
- # impossible to generate a new uid because acquisition is not available
- # in the dictionary.
- assert activity_tool is not None
- self._activity_tool = activity_tool
-
- # Referring to a persistent object is dangerous when finishing a transaction,
- # so store only the required information.
- self._activity_tool_path = activity_tool.getPhysicalPath()
-
- try:
- buffer_dict_lock.acquire()
- if self._activity_tool_path not in buffer_dict:
- buffer_dict[self._activity_tool_path] = threading.local()
- finally:
- buffer_dict_lock.release()
-
def _getBuffer(self):
- buffer = buffer_dict[self._activity_tool_path]
+ buffer = self
# Create attributes only if they are not present.
if not hasattr(buffer, 'queued_activity'):
buffer.queued_activity = []
@@ -99,9 +69,13 @@
buffer = self._getBuffer()
return buffer.uid_set_dict.setdefault(activity, set())
+ def _register(self, activity_tool):
+ self._beginAndHook(activity_tool)
+ TM._register(self)
+
# Keeps a list of messages to add and remove
# at end of transaction
- def _begin(self, *ignored):
+ def _beginAndHook(self, activity_tool):
# LOG('ActivityBuffer', 0, '_begin %r' % (self,))
from ActivityTool import activity_list
self.requires_prepare = 1
@@ -115,7 +89,7 @@
# patching Trasaction.
transaction = get_transaction()
try:
- transaction.beforeCommitHook(self.tpc_prepare, transaction)
+ transaction.beforeCommitHook(self.tpc_prepare, transaction, activity_tool=activity_tool)
except AttributeError:
pass
except:
@@ -143,7 +117,9 @@
def _abort(self, *ignored):
self._clearBuffer()
- def tpc_prepare(self, transaction, sub=None):
+ def tpc_prepare(self, transaction, sub=None, activity_tool=None):
+ assert activity_tool is not None
+ self._activity_tool_path = activity_tool.getPhysicalPath()
# Do nothing if it is a subtransaction
if sub is not None:
return
@@ -156,23 +132,23 @@
# Try to push / delete all messages
buffer = self._getBuffer()
for (activity, message) in buffer.flushed_activity:
- activity.prepareDeleteMessage(self._activity_tool, message)
+ activity.prepareDeleteMessage(activity_tool, message)
activity_dict = {}
for (activity, message) in buffer.queued_activity:
activity_dict.setdefault(activity, []).append(message)
for activity, message_list in activity_dict.iteritems():
if hasattr(activity, 'prepareQueueMessageList'):
- activity.prepareQueueMessageList(self._activity_tool, message_list)
+ activity.prepareQueueMessageList(activity_tool, message_list)
else:
for message in message_list:
- activity.prepareQueueMessage(self._activity_tool, message)
+ activity.prepareQueueMessage(activity_tool, message)
except:
LOG('ActivityBuffer', ERROR, "exception during tpc_prepare",
error=sys.exc_info())
raise
def deferredQueueMessage(self, activity_tool, activity, message):
- self._register()
+ self._register(activity_tool)
# Activity is called to prevent queuing some messages (useful for example
# to prevent reindexing objects multiple times)
if not activity.isMessageRegistered(self, activity_tool, message):
@@ -183,7 +159,7 @@
activity.registerMessage(self, activity_tool, message)
def deferredDeleteMessage(self, activity_tool, activity, message):
- self._register()
+ self._register(activity_tool)
buffer = self._getBuffer()
buffer.flushed_activity.append((activity, message))
Modified: erp5/trunk/products/CMFActivity/ActivityTool.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/ActivityTool.py?rev=17102&r1=17101&r2=17102&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/ActivityTool.py (original)
+++ erp5/trunk/products/CMFActivity/ActivityTool.py Mon Oct 22 18:30:50 2007
@@ -80,6 +80,13 @@
# Activity Registration
activity_dict = {}
activity_list = []
+
+# Here go ActivityBuffer instances
+# Structure:
+# global_activity_buffer[activity_tool_path][thread_id] = ActivityBuffer
+global_activity_buffer = {}
+from thread import get_ident, allocate_lock
+global_activity_buffer_lock = allocate_lock()
def registerActivity(activity):
# Must be rewritten to register
@@ -680,40 +687,76 @@
return 1
return 0
+ def getActivityBuffer(self, create_if_not_found=True):
+ """
+ Get activtity buffer for this thread for this activity tool.
+ If no activity buffer is found at lowest level and create_if_not_found
+ is True, create one.
+ Intermediate level is unconditionaly created if non existant because
+ chances are it will be used in the instance life.
+ Lock is held when checking for intermediate level existance
+ because:
+ - intermediate level dict must not be created in 2 threads at the
+ same time, since one creation would destroy the existing one.
+ It's released after that step because:
+ - lower level access is at thread scope, thus by definition there
+ can be only one access at a time to a key
+ - GIL protects us when accessing python instances
+ """
+ global global_activity_buffer
+ global global_activity_buffer_lock
+ assert getattr(self, 'aq_self', None) is not None
+ my_instance_key = self.getPhysicalPath()
+ my_thread_key = get_ident()
+ global_activity_buffer_lock.acquire()
+ try:
+ if my_instance_key not in global_activity_buffer:
+ global_activity_buffer[my_instance_key] = {}
+ finally:
+ global_activity_buffer_lock.release()
+ thread_activity_buffer = global_activity_buffer[my_instance_key]
+ if my_thread_key not in thread_activity_buffer:
+ if create_if_not_found:
+ buffer = ActivityBuffer(activity_tool=self)
+ else:
+ buffer = None
+ thread_activity_buffer[my_thread_key] = buffer
+ activity_buffer = thread_activity_buffer[my_thread_key]
+ return activity_buffer
+
security.declarePrivate('activateObject')
def activateObject(self, object, activity, active_process, **kw):
global is_initialized
if not is_initialized: self.initialize()
- if getattr(self, '_v_activity_buffer', None) is None:
- self._v_activity_buffer = ActivityBuffer(activity_tool=self)
+ self.getActivityBuffer()
return ActiveWrapper(object, activity, active_process, **kw)
def deferredQueueMessage(self, activity, message):
- self._v_activity_buffer.deferredQueueMessage(self, activity, message)
+ activity_buffer = self.getActivityBuffer()
+ activity_buffer.deferredQueueMessage(self, activity, message)
def deferredDeleteMessage(self, activity, message):
- if getattr(self, '_v_activity_buffer', None) is None:
- self._v_activity_buffer = ActivityBuffer(activity_tool=self)
- self._v_activity_buffer.deferredDeleteMessage(self, activity, message)
+ activity_buffer = self.getActivityBuffer()
+ activity_buffer.deferredDeleteMessage(self, activity, message)
def getRegisteredMessageList(self, activity):
- activity_buffer = getattr(self, '_v_activity_buffer', None)
+ activity_buffer = self.getActivityBuffer(create_if_not_found=False)
if activity_buffer is not None:
- activity_buffer._register() # This is required if flush flush is called outside activate
- return activity.getRegisteredMessageList(self._v_activity_buffer,
+ #activity_buffer._register() # This is required if flush flush is called outside activate
+ return activity.getRegisteredMessageList(activity_buffer,
aq_inner(self))
else:
return []
def unregisterMessage(self, activity, message):
- self._v_activity_buffer._register() # Required if called by flush, outside activate
- return activity.unregisterMessage(self._v_activity_buffer, aq_inner(self), message)
+ activity_buffer = self.getActivityBuffer()
+ #activity_buffer._register()
+ return activity.unregisterMessage(activity_buffer, aq_inner(self), message)
def flush(self, obj, invoke=0, **kw):
global is_initialized
if not is_initialized: self.initialize()
- if getattr(self, '_v_activity_buffer', None) is None:
- self._v_activity_buffer = ActivityBuffer(activity_tool=self)
+ self.getActivityBuffer()
if isinstance(obj, tuple):
object_path = obj
else:
@@ -840,8 +883,7 @@
# Some Security Cheking should be made here XXX
global is_initialized
if not is_initialized: self.initialize()
- if getattr(self, '_v_activity_buffer', None) is None:
- self._v_activity_buffer = ActivityBuffer(activity_tool=self)
+ self.getActivityBuffer()
activity_dict[activity].queueMessage(aq_inner(self),
Message(path, active_process, activity_kw, method_id, args, kw))
More information about the Erp5-report
mailing list