[Erp5-report] r8153 - in /erp5/trunk/products/CMFActivity: ./ Activity/
nobody at svn.erp5.org
nobody at svn.erp5.org
Fri Jun 23 13:35:06 CEST 2006
Author: yo
Date: Fri Jun 23 13:35:04 2006
New Revision: 8153
URL: http://svn.erp5.org?rev=8153&view=rev
Log:
Do not refer to the activity tool in _finish.
Instead, store the path in ActivityBuffer in __init__.
Make the parameter activity_tool to __init__ in ActivityBuffer obligatory.
Some performance tuning.
Modified:
erp5/trunk/products/CMFActivity/Activity/Queue.py
erp5/trunk/products/CMFActivity/Activity/RAMDict.py
erp5/trunk/products/CMFActivity/Activity/RAMQueue.py
erp5/trunk/products/CMFActivity/ActivityBuffer.py
Modified: erp5/trunk/products/CMFActivity/Activity/Queue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/Queue.py?rev=8153&r1=8152&r2=8153&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/Queue.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/Queue.py Fri Jun 23 13:35:04 2006
@@ -188,50 +188,50 @@
def getMessageList(self, activity_tool, processing_node=None,**kw):
return []
-
+
# Transaction Management
def prepareQueueMessage(self, activity_tool, m):
# Called to prepare transaction commit for queued messages
pass
-
- def finishQueueMessage(self, activity_tool, m):
+
+ def finishQueueMessage(self, activity_tool_path, m):
# Called to commit queued messages
pass
def prepareDeleteMessage(self, activity_tool, m):
# Called to prepare transaction commit for deleted messages
pass
-
- def finishDeleteMessage(self, activity_tool, m):
+
+ def finishDeleteMessage(self, activity_tool_path, m):
# Called to commit deleted messages
pass
-
+
# Registration Management
def registerActivityBuffer(self, activity_buffer):
class_name = self.__class__.__name__
setattr(activity_buffer, '_%s_message_list' % class_name, [])
-
+
def isMessageRegistered(self, activity_buffer, activity_tool, m):
class_name = self.__class__.__name__
return m in getattr(activity_buffer, '_%s_message_list' % class_name)
-
+
def registerMessage(self, activity_buffer, activity_tool, m):
class_name = self.__class__.__name__
getattr(activity_buffer, '_%s_message_list' % class_name).append(m)
m.is_registered = 1
-
+
def unregisterMessage(self, activity_buffer, activity_tool, m):
m.is_registered = 0
-
+
def getRegisteredMessageList(self, activity_buffer, activity_tool):
class_name = self.__class__.__name__
if hasattr(activity_buffer, '_%s_message_list' % class_name):
- return filter(lambda m: m.is_registered, getattr(activity_buffer, '_%s_message_list' % class_name))
+ return filter(lambda m: m.is_registered, getattr(activity_buffer, '_%s_message_list' % class_name))
else:
- return ()
-
- # Required for tests (time shift)
- def timeShift(self, activity_tool, delay):
+ return ()
+
+ # Required for tests (time shift)
+ def timeShift(self, activity_tool, delay):
"""
delay is provided in fractions of day
"""
Modified: erp5/trunk/products/CMFActivity/Activity/RAMDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/RAMDict.py?rev=8153&r1=8152&r2=8153&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/RAMDict.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/RAMDict.py Fri Jun 23 13:35:04 2006
@@ -50,20 +50,17 @@
Queue.__init__(self)
self.queue_dict = {}
- def getDict(self, activity_tool):
- path = activity_tool.getPhysicalPath()
- if not self.queue_dict.has_key(path):
- self.queue_dict[path] = {}
- return self.queue_dict[path]
-
- def finishQueueMessage(self, activity_tool, m):
+ def getDict(self, activity_tool_path):
+ return self.queue_dict.setdefault(activity_tool_path, {})
+
+ def finishQueueMessage(self, activity_tool_path, m):
if m.is_registered:
- self.getDict(activity_tool)[(tuple(m.object_path), m.method_id)] = m
+ self.getDict(activity_tool_path)[(tuple(m.object_path), m.method_id)] = m
- def finishDeleteMessage(self, activity_tool, message):
- for key, m in self.getDict(activity_tool).items():
+ def finishDeleteMessage(self, activity_tool_path, message):
+ for key, m in self.getDict(activity_tool_path).items():
if m.object_path == message.object_path and m.method_id == message.method_id:
- del self.getDict(activity_tool)[(tuple(m.object_path), m.method_id)]
+ del self.getDict(activity_tool_path)[(tuple(m.object_path), m.method_id)]
def registerActivityBuffer(self, activity_buffer):
class_name = self.__class__.__name__
@@ -81,13 +78,14 @@
m.is_registered = 1
def dequeueMessage(self, activity_tool, processing_node):
- if len(self.getDict(activity_tool).keys()) is 0:
+ path = activity_tool.getPhysicalPath()
+ if len(self.getDict(path).keys()) is 0:
return 1 # Go to sleep
- for key, m in self.getDict(activity_tool).items():
+ for key, m in self.getDict(path).items():
if m.validate(self, activity_tool) is VALID:
activity_tool.invoke(m)
if m.is_executed:
- del self.getDict(activity_tool)[key]
+ del self.getDict(path)[key]
get_transaction().commit()
return 0
else:
@@ -99,9 +97,10 @@
if object is not None:
object_path = object.getPhysicalPath()
else:
- object_path = None
- active_process = kw.get('active_process', None)
- for m in self.getDict(activity_tool).values():
+ object_path = None
+ active_process = kw.get('active_process', None)
+ path = activity_tool.getPhysicalPath()
+ for m in self.getDict(path).values():
# Filter active process and path if defined
if active_process is None or m.active_process == active_process:
if object_path is None or m.object_path == object_path:
@@ -133,16 +132,17 @@
'The document %s does not exist' % path)
else:
method_dict[m.method_id] = 1
- activity_tool.unregisterMessage(self, m)
- else:
+ activity_tool.unregisterMessage(self, m)
+ else:
method_dict[m.method_id] = 1
activity_tool.unregisterMessage(self, m)
# Parse each message in RAM dict
- for key, m in self.getDict(activity_tool).items():
+ path = activity_tool.getPhysicalPath()
+ for key, m in self.getDict(path).items():
if object_path == m.object_path and (method_id is None or method_id == m.method_id):
if not method_dict.has_key(m.method_id):
LOG('CMFActivity RAMDict: ', 0, 'flushing object %s' % '/'.join(m.object_path))
- if invoke:
+ if invoke:
activity_tool.invoke(m)
if m.is_executed:
method_dict[m.method_id] = 1
@@ -150,15 +150,16 @@
else:
method_dict[m.method_id] = 1
self.deleteMessage(activity_tool, m)
- else:
+ else:
self.deleteMessage(activity_tool, m)
def getMessageList(self, activity_tool, processing_node=None,**kw):
new_queue = []
- for m in self.getDict(activity_tool).values():
+ path = activity_tool.getPhysicalPath()
+ for m in self.getDict(path).values():
m.processing_node = 1
m.priority = 0
new_queue.append(m)
return new_queue
-
+
registerActivity(RAMDict)
Modified: erp5/trunk/products/CMFActivity/Activity/RAMQueue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/RAMQueue.py?rev=8153&r1=8152&r2=8153&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/RAMQueue.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/RAMQueue.py Fri Jun 23 13:35:04 2006
@@ -43,23 +43,20 @@
Queue.__init__(self)
self.queue_dict = {}
self.last_uid = 0
-
- def getQueue(self, activity_tool):
- path = activity_tool.getPhysicalPath()
- if not self.queue_dict.has_key(path):
- self.queue_dict[path] = []
- return self.queue_dict[path]
-
- def finishQueueMessage(self, activity_tool, m):
+
+ def getQueue(self, activity_tool_path):
+ return self.queue_dict.setdefault(activity_tool_path, [])
+
+ def finishQueueMessage(self, activity_tool_path, m):
if m.is_registered:
# XXX - Some lock is required on this section
self.last_uid = self.last_uid + 1
m.uid = self.last_uid
- self.getQueue(activity_tool).append(m)
+ self.getQueue(activity_tool_path).append(m)
- def finishDeleteMessage(self, activity_tool, m):
+ def finishDeleteMessage(self, activity_tool_path, m):
i = 0
- queue = self.getQueue(activity_tool)
+ queue = self.getQueue(activity_tool_path)
for my_message in queue:
if my_message.uid == m.uid:
del queue[i]
@@ -67,7 +64,8 @@
i = i + 1
def dequeueMessage(self, activity_tool, processing_node):
- for m in self.getQueue(activity_tool):
+ path = activity_tool.getPhysicalPath()
+ for m in self.getQueue(path):
if m.validate(self, activity_tool) is not VALID:
self.deleteMessage(activity_tool, m) # Trash messages which are not validated (no error handling)
get_transaction().commit() # Start a new transaction
@@ -76,7 +74,7 @@
if m.is_executed:
self.deleteMessage(activity_tool, m) # Trash messages which are not validated (no error handling)
get_transaction().commit() # Start a new transaction
- return 0 # Keep on ticking
+ return 0 # Keep on ticking
else:
# Start a new transaction and keep on to next message
get_transaction().commit()
@@ -88,8 +86,9 @@
object_path = object.getPhysicalPath()
else:
object_path = None
- active_process = kw.get('active_process', None)
- for m in self.getQueue(activity_tool):
+ active_process = kw.get('active_process', None)
+ path = activity_tool.getPhysicalPath()
+ for m in self.getQueue(path):
# Filter active process and path if defined
if active_process is None or m.active_process == active_process:
if object_path is None or m.object_path == object_path:
@@ -102,29 +101,31 @@
if object_path == m.object_path and (method_id is None or method_id == m.method_id):
if m.validate(self, activity_tool) is not VALID:
activity_tool.unregisterMessage(self, m) # Trash messages which are not validated (no error handling)
- else:
+ else:
if invoke:
activity_tool.invoke(m)
if m.is_executed:
activity_tool.unregisterMessage(self, m)
- else:
+ else:
activity_tool.unregisterMessage(self, m)
# Parse each message in queue
- for m in self.getQueue(activity_tool):
+ path = activity_tool.getPhysicalPath()
+ for m in self.getQueue(path):
if object_path == m.object_path and (method_id is None or method_id == m.method_id):
if m.validate(self, activity_tool) is not VALID:
self.deleteMessage(activity_tool, m) # Trash messages which are not validated (no error handling)
- else:
+ else:
if invoke:
activity_tool.invoke(m)
if m.is_executed:
self.deleteMessage(activity_tool, m) # Only delete if no error happens
- else:
+ else:
self.deleteMessage(activity_tool, m)
def getMessageList(self, activity_tool, processing_node=None,**kw):
new_queue = []
- for m in self.getQueue(activity_tool):
+ path = activity_tool.getPhysicalPath()
+ for m in self.getQueue(path):
m.processing_node = 1
m.priority = 0
new_queue.append(m)
Modified: erp5/trunk/products/CMFActivity/ActivityBuffer.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/ActivityBuffer.py?rev=8153&r1=8152&r2=8153&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/ActivityBuffer.py (original)
+++ erp5/trunk/products/CMFActivity/ActivityBuffer.py Fri Jun 23 13:35:04 2006
@@ -26,6 +26,7 @@
from Shared.DC.ZRDB.TM import TM
from zLOG import LOG, ERROR, INFO
import sys
+from thread import allocate_lock, get_ident
try:
from transaction import get as get_transaction
@@ -34,123 +35,120 @@
class ActivityBuffer(TM):
- _p_oid=_p_changed=_registered=None
+ _p_oid=_p_changed=_registered=None
- def __init__(self, activity_tool=None):
- from thread import allocate_lock
- self._use_TM = self._transactions = 1
- if self._use_TM:
- self._tlock = allocate_lock()
- self._tthread = None
- self._lock = allocate_lock()
- if activity_tool is not None:
- self._activity_tool = activity_tool
+ def __init__(self, activity_tool=None):
+ self._use_TM = self._transactions = 1
+ if self._use_TM:
+ self._tlock = allocate_lock()
+ self._tthread = None
+ self._lock = allocate_lock()
- # Keeps a list of messages to add and remove
- # at end of transaction
- def _begin(self, *ignored):
- from thread import get_ident
- from ActivityTool import activity_list
- self._tlock.acquire()
- self._tthread = get_ident()
- self.requires_prepare = 1
- try:
- self.queued_activity = []
- self.flushed_activity = []
- for activity in activity_list: # Reset registration for each transaction
- activity.registerActivityBuffer(self)
- # In Zope 2.8 (ZODB 3.4), use beforeCommitHook instead of
- # patching Trasaction.
- transaction = get_transaction()
- try:
- transaction.beforeCommitHook(self.tpc_prepare, transaction)
- except AttributeError:
- pass
- except:
- LOG('ActivityBuffer', ERROR, "exception during _begin",
- error=sys.exc_info())
- self._tlock.release()
- raise
+ # Directly store the activity tool as an attribute. At the beginning
+ # the activity tool was stored as a part of the key in queued_activity and
+ # in flushed_activity, but this is not nice because in that case we must
+ # use hash on it, and when there is no uid on activity tool, it is
+ # impossible to generate a new uid because acquisition is not available
+ # in the dictionary.
+ assert activity_tool is not None
+ self._activity_tool = activity_tool
- def _finish(self, *ignored):
- from thread import get_ident
- if not self._tlock.locked() or self._tthread != get_ident():
- LOG('ActivityBuffer', INFO, "ignoring _finish")
- return
- try:
- try:
- # Try to push / delete all messages
- for (activity, message) in self.flushed_activity:
- #LOG('ActivityBuffer finishDeleteMessage', ERROR, str(message.method_id))
- activity.finishDeleteMessage(self._activity_tool, message)
- for (activity, message) in self.queued_activity:
- #LOG('ActivityBuffer finishQueueMessage', ERROR, str(message.method_id))
- activity.finishQueueMessage(self._activity_tool, message)
- except:
- LOG('ActivityBuffer', ERROR, "exception during _finish",
- error=sys.exc_info())
- raise
- finally:
- self._tlock.release()
+ # Referring to a persistent object is dangerous when finishing a transaction,
+ # so store only the required information.
+ self._activity_tool_path = activity_tool.getPhysicalPath()
- def _abort(self, *ignored):
- from thread import get_ident
- if not self._tlock.locked() or self._tthread != get_ident():
- LOG('ActivityBuffer', 0, "ignoring _abort")
- return
- self._tlock.release()
+ # Keeps a list of messages to add and remove
+ # at end of transaction
+ def _begin(self, *ignored):
+ from ActivityTool import activity_list
+ self._tlock.acquire()
+ self._tthread = get_ident()
+ self.requires_prepare = 1
+ try:
+ self.queued_activity = []
+ self.flushed_activity = []
+ for activity in activity_list: # Reset registration for each transaction
+ activity.registerActivityBuffer(self)
+ # In Zope 2.8 (ZODB 3.4), use beforeCommitHook instead of
+ # patching Trasaction.
+ transaction = get_transaction()
+ try:
+ transaction.beforeCommitHook(self.tpc_prepare, transaction)
+ except AttributeError:
+ pass
+ except:
+ LOG('ActivityBuffer', ERROR, "exception during _begin",
+ error=sys.exc_info())
+ self._tlock.release()
+ raise
- def tpc_prepare(self, transaction, sub=None):
- if sub is not None: # Do nothing if it is a subtransaction
- return
- if not self.requires_prepare: return
- self.requires_prepare = 0
- from thread import get_ident
- if not self._tlock.locked() or self._tthread != get_ident():
- LOG('ActivityBuffer', 0, "ignoring tpc_prepare")
- return
- try:
- # Try to push / delete all messages
- for (activity, message) in self.flushed_activity:
- #LOG('ActivityBuffer prepareDeleteMessage', ERROR, str(message.method_id))
- activity.prepareDeleteMessage(self._activity_tool, message)
- activity_dict = {}
- for (activity, message) in self.queued_activity:
- key = activity
- if key not in activity_dict:
- activity_dict[key] = []
- activity_dict[key].append(message)
- for key, message_list in activity_dict.items():
- activity = key
- if hasattr(activity, 'prepareQueueMessageList'):
- activity.prepareQueueMessageList(self._activity_tool, message_list)
- else:
- for message in message_list:
- activity.prepareQueueMessage(self._activity_tool, message)
- except:
- LOG('ActivityBuffer', ERROR, "exception during tpc_prepare",
- error=sys.exc_info())
- raise
+ def _finish(self, *ignored):
+ if not self._tlock.locked() or self._tthread != get_ident():
+ LOG('ActivityBuffer', INFO, "ignoring _finish")
+ return
+ try:
+ try:
+ # Try to push / delete all messages
+ for (activity, message) in self.flushed_activity:
+ #LOG('ActivityBuffer finishDeleteMessage', ERROR, str(message.method_id))
+ activity.finishDeleteMessage(self._activity_tool_path, message)
+ for (activity, message) in self.queued_activity:
+ #LOG('ActivityBuffer finishQueueMessage', ERROR, str(message.method_id))
+ activity.finishQueueMessage(self._activity_tool_path, message)
+ except:
+ LOG('ActivityBuffer', ERROR, "exception during _finish",
+ error=sys.exc_info())
+ raise
+ finally:
+ self._tlock.release()
- def deferredQueueMessage(self, activity_tool, activity, message):
- self._register()
- # Directly store the activity tool as an attribute. At the beginning
- # the activity tool was stored as a part of the key in queued_activity and
- # in flushed_activity, but this is not nice because in that case we must
- # use hash on it, and when there is no uid on activity tool, it is
- # impossible to generate a new uid because acquisition is not available
- # in the dictionnary.
- if getattr(self,'_activity_tool',None) is None:
- self._activity_tool = activity_tool
- # Activity is called to prevent queuing some messages (useful for example
- # to prevent reindexing objects multiple times)
- if not activity.isMessageRegistered(self, activity_tool, message):
- self.queued_activity.append((activity, message))
- # We register queued messages so that we can
- # unregister them
- activity.registerMessage(self, activity_tool, message)
+ def _abort(self, *ignored):
+ if not self._tlock.locked() or self._tthread != get_ident():
+ LOG('ActivityBuffer', 0, "ignoring _abort")
+ return
+ self._tlock.release()
- def deferredDeleteMessage(self, activity_tool, activity, message):
- self._register()
- self.flushed_activity.append((activity, message))
+ def tpc_prepare(self, transaction, sub=None):
+ if sub is not None: # Do nothing if it is a subtransaction
+ return
+ if not self.requires_prepare: return
+ self.requires_prepare = 0
+ if not self._tlock.locked() or self._tthread != get_ident():
+ LOG('ActivityBuffer', 0, "ignoring tpc_prepare")
+ return
+ try:
+ # Try to push / delete all messages
+ for (activity, message) in self.flushed_activity:
+ #LOG('ActivityBuffer prepareDeleteMessage', ERROR, str(message.method_id))
+ activity.prepareDeleteMessage(self._activity_tool, message)
+ activity_dict = {}
+ for (activity, message) in self.queued_activity:
+ key = activity
+ if key not in activity_dict:
+ activity_dict[key] = []
+ activity_dict[key].append(message)
+ for key, message_list in activity_dict.items():
+ activity = key
+ if hasattr(activity, 'prepareQueueMessageList'):
+ activity.prepareQueueMessageList(self._activity_tool, message_list)
+ else:
+ for message in message_list:
+ activity.prepareQueueMessage(self._activity_tool, message)
+ except:
+ LOG('ActivityBuffer', ERROR, "exception during tpc_prepare",
+ error=sys.exc_info())
+ raise
+ def deferredQueueMessage(self, activity_tool, activity, message):
+ self._register()
+ # Activity is called to prevent queuing some messages (useful for example
+ # to prevent reindexing objects multiple times)
+ if not activity.isMessageRegistered(self, activity_tool, message):
+ self.queued_activity.append((activity, message))
+ # We register queued messages so that we can
+ # unregister them
+ activity.registerMessage(self, activity_tool, message)
+
+ def deferredDeleteMessage(self, activity_tool, activity, message):
+ self._register()
+ self.flushed_activity.append((activity, message))
More information about the Erp5-report
mailing list