[Erp5-report] r13188 - in /erp5/trunk/products/CMFActivity: ./ Activity/ skins/activity/
nobody at svn.erp5.org
nobody at svn.erp5.org
Fri Mar 2 15:48:51 CET 2007
Author: yo
Date: Fri Mar 2 15:48:48 2007
New Revision: 13188
URL: http://svn.erp5.org?rev=13188&view=rev
Log:
Use RAM-based approach for ActivityBuffer instead of a volatile attribute. Clean up some mess.
Modified:
erp5/trunk/products/CMFActivity/ActiveObject.py
erp5/trunk/products/CMFActivity/Activity/Queue.py
erp5/trunk/products/CMFActivity/Activity/RAMDict.py
erp5/trunk/products/CMFActivity/Activity/SQLDict.py
erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
erp5/trunk/products/CMFActivity/ActivityBuffer.py
erp5/trunk/products/CMFActivity/ActivityTool.py
erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_delMessage.zsql
Modified: erp5/trunk/products/CMFActivity/ActiveObject.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/ActiveObject.py?rev=13188&r1=13187&r2=13188&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/ActiveObject.py (original)
+++ erp5/trunk/products/CMFActivity/ActiveObject.py Fri Mar 2 15:48:48 2007
@@ -134,9 +134,9 @@
def recursiveFlushActivity(self, invoke=0, **kw):
# flush all activities related to this object
self.flushActivity(invoke=invoke, **kw)
- if hasattr(aq_base(self), 'objectValues'):
+ if getattr(aq_base(self), 'objectValues', None) is not None:
for o in self.objectValues():
- if hasattr(aq_base(o), 'recursiveFlushActivity'):
+ if getattr(aq_base(o), 'recursiveFlushActivity', None) is not None:
o.recursiveFlushActivity(invoke=invoke, **kw)
security.declareProtected( permissions.View, 'hasActivity' )
Modified: erp5/trunk/products/CMFActivity/Activity/Queue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/Queue.py?rev=13188&r1=13187&r2=13188&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/Queue.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/Queue.py Fri Mar 2 15:48:48 2007
@@ -143,7 +143,7 @@
going to be executed
"""
try:
- if activity_tool.unrestrictedTraverse(message.object_path) is None:
+ if activity_tool.unrestrictedTraverse(message.object_path, None) is None:
# Do not try to call methods on objects which do not exist
LOG('WARNING ActivityTool', 0,
'Object %s does not exist' % '/'.join(message.object_path))
@@ -214,27 +214,23 @@
# Registration Management
def registerActivityBuffer(self, activity_buffer):
- class_name = self.__class__.__name__
- setattr(activity_buffer, '_%s_message_list' % class_name, [])
+ pass
def isMessageRegistered(self, activity_buffer, activity_tool, m):
- class_name = self.__class__.__name__
- return m in getattr(activity_buffer, '_%s_message_list' % class_name)
+ message_list = activity_buffer.getMessageList(self)
+ return m in message_list
def registerMessage(self, activity_buffer, activity_tool, m):
- class_name = self.__class__.__name__
- getattr(activity_buffer, '_%s_message_list' % class_name).append(m)
+ message_list = activity_buffer.getMessageList(self)
+ message_list.append(m)
m.is_registered = 1
def unregisterMessage(self, activity_buffer, activity_tool, m):
m.is_registered = 0
def getRegisteredMessageList(self, activity_buffer, activity_tool):
- class_name = self.__class__.__name__
- if hasattr(activity_buffer, '_%s_message_list' % class_name):
- return filter(lambda m: m.is_registered, getattr(activity_buffer, '_%s_message_list' % class_name))
- else:
- return ()
+ message_list = activity_buffer.getMessageList(self)
+ return [m for m in message_list if m.is_registered]
# Required for tests (time shift)
def timeShift(self, activity_tool, delay):
Modified: erp5/trunk/products/CMFActivity/Activity/RAMDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/RAMDict.py?rev=13188&r1=13187&r2=13188&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/RAMDict.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/RAMDict.py Fri Mar 2 15:48:48 2007
@@ -63,18 +63,17 @@
del self.getDict(activity_tool_path)[(tuple(m.object_path), m.method_id)]
def registerActivityBuffer(self, activity_buffer):
- class_name = self.__class__.__name__
- setattr(activity_buffer, '_%s_message_list' % class_name, [])
- setattr(activity_buffer, '_%s_uid_dict' % class_name, {})
+ pass
def isMessageRegistered(self, activity_buffer, activity_tool, m):
- class_name = self.__class__.__name__
- return getattr(activity_buffer, '_%s_uid_dict' % class_name).has_key((tuple(m.object_path), m.method_id))
+ uid_set = activity_buffer.getUidSet(self)
+ return (tuple(m.object_path), m.method_id) in uid_set
def registerMessage(self, activity_buffer, activity_tool, m):
- class_name = self.__class__.__name__
- getattr(activity_buffer, '_%s_message_list' % class_name).append(m)
- getattr(activity_buffer, '_%s_uid_dict' % class_name)[(tuple(m.object_path), m.method_id)] = 1
+ message_list = activity_buffer.getMessageList(self)
+ message_list.append(m)
+ uid_set = activity_buffer.getUidSet(self)
+ uid_set.add((tuple(m.object_path), m.method_id))
m.is_registered = 1
def dequeueMessage(self, activity_tool, processing_node):
Modified: erp5/trunk/products/CMFActivity/Activity/SQLDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLDict.py?rev=13188&r1=13187&r2=13188&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLDict.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLDict.py Fri Mar 2 15:48:48 2007
@@ -116,36 +116,28 @@
# Registration management
def registerActivityBuffer(self, activity_buffer):
- class_name = self.__class__.__name__
- setattr(activity_buffer, '_%s_uid_dict' % class_name, {})
- setattr(activity_buffer, '_%s_message_list' % class_name, [])
+ pass
def isMessageRegistered(self, activity_buffer, activity_tool, m):
- class_name = self.__class__.__name__
- uid_dict = getattr(activity_buffer,'_%s_uid_dict' % class_name)
- return uid_dict.has_key((tuple(m.object_path), m.method_id))
+ uid_set = activity_buffer.getUidSet(self)
+ return (tuple(m.object_path), m.method_id) in uid_set
def registerMessage(self, activity_buffer, activity_tool, m):
m.is_registered = 1
- class_name = self.__class__.__name__
- uid_dict = getattr(activity_buffer,'_%s_uid_dict' % class_name)
- uid_dict[(tuple(m.object_path), m.method_id)] = 1
- getattr(activity_buffer,'_%s_message_list' % class_name).append(m)
+ uid_set = activity_buffer.getUidSet(self)
+ uid_set.add((tuple(m.object_path), m.method_id))
+ message_list = activity_buffer.getMessageList(self)
+ message_list.append(m)
def unregisterMessage(self, activity_buffer, activity_tool, m):
m.is_registered = 0 # This prevents from inserting deleted messages into the queue
class_name = self.__class__.__name__
- uid_dict = getattr(activity_buffer,'_%s_uid_dict' % class_name)
- if uid_dict.has_key((tuple(m.object_path), m.method_id)):
- del uid_dict[(tuple(m.object_path), m.method_id)]
+ uid_set = activity_buffer.getUidSet(self)
+ uid_set.discard((tuple(m.object_path), m.method_id))
def getRegisteredMessageList(self, activity_buffer, activity_tool):
- class_name = self.__class__.__name__
- if hasattr(activity_buffer,'_%s_message_list' % class_name):
- message_list = getattr(activity_buffer,'_%s_message_list' % class_name)
- return [m for m in message_list if m.is_registered]
- else:
- return ()
+ message_list = activity_buffer.getMessageList(self)
+ return [m for m in message_list if m.is_registered]
def getOrderValidationText(self, message):
# Return an identifier of validators related to ordering.
@@ -456,7 +448,9 @@
else:
raise ActivityFlushError, (
'Could not validate %s on %s' % (m.method_id , path))
- self.deleteMessage(activity_tool, m)
+
+ if len(result):
+ activity_tool.SQLDict_delMessage(uid = [line.uid for line in result])
def getMessageList(self, activity_tool, processing_node=None,include_processing=0,**kw):
# YO: reading all lines might cause a deadlock
Modified: erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLQueue.py?rev=13188&r1=13187&r2=13188&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLQueue.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLQueue.py Fri Mar 2 15:48:48 2007
@@ -72,7 +72,7 @@
def prepareDeleteMessage(self, activity_tool, m):
# Erase all messages in a single transaction
#LOG("prepareDeleteMessage", 0, str(m.__dict__))
- activity_tool.SQLQueue_delMessage(uid = m.uid)
+ activity_tool.SQLQueue_delMessage(uid = [m.uid])
def dequeueMessage(self, activity_tool, processing_node):
readMessage = getattr(activity_tool, 'SQLQueue_readMessage', None)
@@ -147,7 +147,7 @@
if m.is_executed:
- activity_tool.SQLQueue_delMessage(uid=line.uid) # Delete it
+ activity_tool.SQLQueue_delMessage(uid=[line.uid]) # Delete it
else:
try:
# If not, abort transaction and start a new one
@@ -234,7 +234,9 @@
# The message no longer exists
raise ActivityFlushError, (
'The document %s does not exist' % path)
- self.deleteMessage(activity_tool, m)
+
+ if len(result):
+ activity_tool.SQLQueue_delMessage(uid = [line.uid for line in result])
# def start(self, activity_tool, active_process=None):
# uid_list = activity_tool.SQLQueue_readUidList(path=path, active_process=active_process)
Modified: erp5/trunk/products/CMFActivity/ActivityBuffer.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/ActivityBuffer.py?rev=13188&r1=13187&r2=13188&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/ActivityBuffer.py (original)
+++ erp5/trunk/products/CMFActivity/ActivityBuffer.py Fri Mar 2 15:48:48 2007
@@ -26,23 +26,29 @@
from Shared.DC.ZRDB.TM import TM
from zLOG import LOG, ERROR, INFO
import sys
-from thread import allocate_lock, get_ident
+import threading
try:
from transaction import get as get_transaction
except ImportError:
pass
+# This variable is used to store thread-local buffered information.
+# This must be RAM-based, because the use of a volatile attribute does
+# not guarantee that the information persists until the end of a
+# transaction, but we need to assure that the information is accessible
+# for flushing activities. So the approach here is that information is
+# stored in RAM, and removed at _finish and _abort, so that the information
+# would not span over transactions.
+buffer_dict_lock = threading.Lock()
+buffer_dict = {}
+
class ActivityBuffer(TM):
_p_oid=_p_changed=_registered=None
def __init__(self, activity_tool=None):
- self._use_TM = self._transactions = 1
- if self._use_TM:
- self._tlock = allocate_lock()
- self._tthread = None
- self._lock = allocate_lock()
+ self.requires_prepare = 0
# Directly store the activity tool as an attribute. At the beginning
# the activity tool was stored as a part of the key in queued_activity and
@@ -57,18 +63,51 @@
# so store only the required information.
self._activity_tool_path = activity_tool.getPhysicalPath()
+ try:
+ buffer_dict_lock.acquire()
+ if self._activity_tool_path not in buffer_dict:
+ buffer_dict[self._activity_tool_path] = threading.local()
+ finally:
+ buffer_dict_lock.release()
+
+ # Create attributes only if they are not present.
+ buffer = self._getBuffer()
+ if not hasattr(buffer, 'queued_activity'):
+ buffer.queued_activity = []
+ buffer.flushed_activity = []
+ buffer.message_list_dict = {}
+ buffer.uid_set_dict = {}
+
+ def _getBuffer(self):
+ return buffer_dict[self._activity_tool_path]
+
+ def _clearBuffer(self):
+ buffer = self._getBuffer()
+ del buffer.queued_activity[:]
+ del buffer.flushed_activity[:]
+ buffer.message_list_dict.clear()
+ buffer.uid_set_dict.clear()
+
+ def getMessageList(self, activity):
+ buffer = self._getBuffer()
+ return buffer.message_list_dict.setdefault(activity, [])
+
+ def getUidSet(self, activity):
+ buffer = self._getBuffer()
+ return buffer.uid_set_dict.setdefault(activity, set())
+
# Keeps a list of messages to add and remove
# at end of transaction
def _begin(self, *ignored):
+ LOG('ActivityBuffer', 0, '_begin %r' % (self,))
from ActivityTool import activity_list
- self._tlock.acquire()
- self._tthread = get_ident()
self.requires_prepare = 1
try:
- self.queued_activity = []
- self.flushed_activity = []
- for activity in activity_list: # Reset registration for each transaction
+
+ # Reset registration for each transaction.
+ for activity in activity_list:
activity.registerActivityBuffer(self)
+
# In Zope 2.8 (ZODB 3.4), use beforeCommitHook instead of
# patching Trasaction.
transaction = get_transaction()
@@ -79,56 +118,46 @@
except:
LOG('ActivityBuffer', ERROR, "exception during _begin",
error=sys.exc_info())
- self._tlock.release()
raise
def _finish(self, *ignored):
- if not self._tlock.locked() or self._tthread != get_ident():
- LOG('ActivityBuffer', INFO, "ignoring _finish")
- return
+ LOG('ActivityBuffer', 0, '_finish %r' % (self,))
try:
try:
# Try to push / delete all messages
- for (activity, message) in self.flushed_activity:
- #LOG('ActivityBuffer finishDeleteMessage', ERROR, str(message.method_id))
+ buffer = self._getBuffer()
+ for (activity, message) in buffer.flushed_activity:
activity.finishDeleteMessage(self._activity_tool_path, message)
- for (activity, message) in self.queued_activity:
- #LOG('ActivityBuffer finishQueueMessage', ERROR, str(message.method_id))
+ for (activity, message) in buffer.queued_activity:
activity.finishQueueMessage(self._activity_tool_path, message)
except:
LOG('ActivityBuffer', ERROR, "exception during _finish",
error=sys.exc_info())
raise
finally:
- self._tlock.release()
+ self._clearBuffer()
def _abort(self, *ignored):
- if not self._tlock.locked() or self._tthread != get_ident():
- LOG('ActivityBuffer', 0, "ignoring _abort")
- return
- self._tlock.release()
+ self._clearBuffer()
def tpc_prepare(self, transaction, sub=None):
- if sub is not None: # Do nothing if it is a subtransaction
+ # Do nothing if it is a subtransaction
+ if sub is not None:
return
- if not self.requires_prepare: return
+
+ if not self.requires_prepare:
+ return
+
self.requires_prepare = 0
- if not self._tlock.locked() or self._tthread != get_ident():
- LOG('ActivityBuffer', 0, "ignoring tpc_prepare")
- return
try:
# Try to push / delete all messages
- for (activity, message) in self.flushed_activity:
- #LOG('ActivityBuffer prepareDeleteMessage', ERROR, str(message.method_id))
+ buffer = self._getBuffer()
+ for (activity, message) in buffer.flushed_activity:
activity.prepareDeleteMessage(self._activity_tool, message)
activity_dict = {}
- for (activity, message) in self.queued_activity:
- key = activity
- if key not in activity_dict:
- activity_dict[key] = []
- activity_dict[key].append(message)
- for key, message_list in activity_dict.items():
- activity = key
+ for (activity, message) in buffer.queued_activity:
+ activity_dict.setdefault(activity, []).append(message)
+ for activity, message_list in activity_dict.iteritems():
if hasattr(activity, 'prepareQueueMessageList'):
activity.prepareQueueMessageList(self._activity_tool, message_list)
else:
@@ -144,11 +173,13 @@
# Activity is called to prevent queuing some messages (useful for example
# to prevent reindexing objects multiple times)
if not activity.isMessageRegistered(self, activity_tool, message):
- self.queued_activity.append((activity, message))
+ buffer = self._getBuffer()
+ buffer.queued_activity.append((activity, message))
# We register queued messages so that we can
# unregister them
activity.registerMessage(self, activity_tool, message)
def deferredDeleteMessage(self, activity_tool, activity, message):
self._register()
- self.flushed_activity.append((activity, message))
+ buffer = self._getBuffer()
+ buffer.flushed_activity.append((activity, message))
Modified: erp5/trunk/products/CMFActivity/ActivityTool.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/ActivityTool.py?rev=13188&r1=13187&r2=13188&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/ActivityTool.py (original)
+++ erp5/trunk/products/CMFActivity/ActivityTool.py Fri Mar 2 15:48:48 2007
@@ -641,15 +641,15 @@
self._v_activity_buffer._register() # Required if called by flush, outside activate
return activity.unregisterMessage(self._v_activity_buffer, self, message)
- def flush(self, object, invoke=0, **kw):
+ def flush(self, obj, invoke=0, **kw):
global is_initialized
if not is_initialized: self.initialize()
if getattr(self, '_v_activity_buffer', None) is None:
self._v_activity_buffer = ActivityBuffer(activity_tool=self)
- if type(object) is TupleType:
- object_path = object
+ if isinstance(obj, tuple):
+ object_path = obj
else:
- object_path = object.getPhysicalPath()
+ object_path = obj.getPhysicalPath()
for activity in activity_list:
activity.flush(self, object_path, invoke=invoke, **kw)
Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_delMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_delMessage.zsql?rev=13188&r1=13187&r2=13188&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_delMessage.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_delMessage.zsql Fri Mar 2 15:48:48 2007
@@ -11,4 +11,5 @@
DELETE FROM
message_queue
WHERE
- uid = <dtml-sqlvar uid type="int">
+<dtml-in uid>uid = <dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else> OR </dtml-if>
+</dtml-in>
More information about the Erp5-report
mailing list