[Erp5-report] r14039 - in /erp5/trunk/products/CMFActivity: ./ Activity/ skins/activity/
nobody at svn.erp5.org
nobody at svn.erp5.org
Wed Apr 11 03:22:01 CEST 2007
Author: yo
Date: Wed Apr 11 03:21:56 2007
New Revision: 14039
URL: http://svn.erp5.org?rev=14039&view=rev
Log:
This big change optimizes the scheduling of active objects,
and fix some bugs.
The basic idea is to track a dependency graph to find executable
messages quickly. This makes the activity system far more efficient,
when you have many inter-dependent messages queued in the tables.
Also, this obsoletes the time shifting in the schedulers,
as executable messages can be found in a more efficient manner.
So the activity parameter "at_date" should work expectedly.
Now the API of validate methods in Activities return a
list of message objects instead of a boolean value. Such
a list contains messages that are depended upon by a given
message.
The validate method in Message accepts a new optional
parameter, check_order_validation, to indicate whether
order validation should be performed. The default behavior
has not changed.
getDependentMessageList is added to ActivityTool, Queue
and Message. This method collects dependent message for
a given message from all activities.
There are some other subtle changes. Look at the diffs for
more details.
Modified:
erp5/trunk/products/CMFActivity/Activity/Queue.py
erp5/trunk/products/CMFActivity/Activity/RAMDict.py
erp5/trunk/products/CMFActivity/Activity/RAMQueue.py
erp5/trunk/products/CMFActivity/Activity/SQLDict.py
erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
erp5/trunk/products/CMFActivity/ActivityTool.py
erp5/trunk/products/CMFActivity/skins/activity/SQLDict_clearProcessingFlag.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readMessageList.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readUidList.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLDict_validateMessageList.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_clearProcessingFlag.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessage.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessageList.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readUidList.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_setPriority.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_validateMessageList.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_writeMessage.zsql
Modified: erp5/trunk/products/CMFActivity/Activity/Queue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/Queue.py?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/Queue.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/Queue.py Wed Apr 11 03:21:56 2007
@@ -26,12 +26,17 @@
#
##############################################################################
-import pickle, sys
-from Acquisition import aq_base
+import cPickle, sys
from DateTime import DateTime
-from Products.CMFActivity.ActivityTool import Message
-from zLOG import LOG
+from zLOG import LOG, WARNING, ERROR
from ZODB.POSException import ConflictError
+import sha
+from cStringIO import StringIO
+
+try:
+ from transaction import get as get_transaction
+except ImportError:
+ pass
# Error values for message validation
EXCEPTION = -1
@@ -93,14 +98,14 @@
def queueMessage(self, activity_tool, m):
activity_tool.deferredQueueMessage(self, m)
-
+
def deleteMessage(self, activity_tool, m):
if not getattr(m, 'is_deleted', 0):
# We try not to delete twice
# However this can not be garanteed in the case of messages loaded from SQL
activity_tool.deferredDeleteMessage(self, m)
m.is_deleted = 1
-
+
def dequeueMessage(self, activity_tool, processing_node):
pass
@@ -122,45 +127,102 @@
self.is_awake[processing_node] = 0
self.is_alive[processing_node] = 0
- def validate(self, activity_tool, message, **kw):
+ def validate(self, activity_tool, message, check_order_validation=1, **kw):
"""
This is the place where activity semantics is implemented
**kw contains all parameters which allow to implement synchronisation,
constraints, delays, etc.
-
+
Standard synchronisation parameters:
-
+
after_method_id -- never validate message if after_method_id
is in the list of methods which are
going to be executed
-
+
after_message_uid -- never validate message if after_message_uid
is in the list of messages which are
going to be executed
-
+
after_path -- never validate message if after_path
is in the list of path which are
- going to be executed
+ going to be executed
"""
try:
if activity_tool.unrestrictedTraverse(message.object_path, None) is None:
# Do not try to call methods on objects which do not exist
- LOG('WARNING ActivityTool', 0,
+ LOG('CMFActivity', WARNING,
'Object %s does not exist' % '/'.join(message.object_path))
return INVALID_PATH
- for k, v in kw.items():
- if activity_tool.validateOrder(message, k, v):
- return INVALID_ORDER
+ if check_order_validation:
+ for k, v in kw.iteritems():
+ if activity_tool.validateOrder(message, k, v):
+ return INVALID_ORDER
except ConflictError:
raise
except:
- LOG('WARNING ActivityTool', 0,
+ LOG('CMFActivity', WARNING,
'Validation of Object %s raised exception' % '/'.join(message.object_path),
error=sys.exc_info())
# Do not try to call methods on objects which cause errors
return EXCEPTION
return VALID
+ def getDependentMessageList(self, activity_tool, message, **kw):
+ message_list = []
+ for k, v in kw.iteritems():
+ result = activity_tool.getDependentMessageList(message, k, v)
+ if result:
+ message_list.extend(result)
+ return message_list
+
+ def getExecutableMessageList(self, activity_tool, message, message_dict,
+ validation_text_dict):
+ """Get messages which have no dependent message, and store them in the dictionary.
+
+ If the passed message itself is executable, simply store only that message.
+ Otherwise, try to find at least one message executable from dependent messages.
+
+ This may result in no new message, if all dependent messages are already present
+ in the dictionary, if all dependent messages are in different activities, or if
+ the message has a circular dependency.
+
+ The validation text dictionary is used only to cache the results of validations,
+ in order to reduce the number of SQL queries.
+ """
+ if message.uid in message_dict:
+ # Nothing to do. But detect a circular dependency.
+ if message_dict[message.uid] is None:
+ LOG('CMFActivity', ERROR,
+ 'message uid %r has a circular dependency' % (message.uid,))
+ return
+
+ cached_result = validation_text_dict.get(message.order_validation_text)
+ if cached_result is None:
+ message_list = message.getDependentMessageList(self, activity_tool)
+ get_transaction().commit() # Release locks.
+ if message_list:
+ # The result is not empty, so this message is not executable.
+ validation_text_dict[message.order_validation_text] = 0
+ now_date = DateTime()
+ for activity, m in message_list:
+ # Note that the messages may contain ones which are already assigned or not
+ # executable yet.
+ if activity is self and m.processing_node == -1 and m.date <= now_date:
+ # Call recursively. Set None as a marker to detect a circular dependency.
+ message_dict[message.uid] = None
+ try:
+ self.getExecutableMessageList(activity_tool, m, message_dict,
+ validation_text_dict)
+ finally:
+ del message_dict[message.uid]
+ else:
+ validation_text_dict[message.order_validation_text] = 1
+ message_dict[message.uid] = message
+ elif cached_result:
+ message_dict[message.uid] = message
+ else:
+ pass
+
def isAwake(self, activity_tool, processing_node):
return self.is_awake[processing_node]
@@ -179,22 +241,40 @@
pass
def loadMessage(self, s, **kw):
- m = pickle.loads(s)
+ m = cPickle.load(StringIO(s))
m.__dict__.update(kw)
return m
def dumpMessage(self, m):
- return pickle.dumps(m)
+ return cPickle.dumps(m)
+
+ def getOrderValidationText(self, message):
+ # Return an identifier of validators related to ordering.
+ order_validation_item_list = []
+ key_list = message.activity_kw.keys()
+ key_list.sort()
+ for key in key_list:
+ method_id = "_validate_%s" % key
+ if hasattr(self, method_id):
+ order_validation_item_list.append((key, message.activity_kw[key]))
+ if len(order_validation_item_list) == 0:
+ # When no order validation argument is specified, skip the computation
+ # of the checksum for speed. Here, 'none' is used, because this never be
+ # identical to SHA1 hexdigest (which is always 40 characters), and 'none'
+ # is true in Python. This is important, because dtml-if assumes that an empty
+ # string is false, so we must use a non-empty string for this.
+ return 'none'
+ return sha.new(repr(order_validation_item_list)).hexdigest()
def getMessageList(self, activity_tool, processing_node=None,**kw):
- return []
+ return []
def countMessage(self, activity_tool,**kw):
return 0
def countMessageWithTag(self, activity_tool,value):
return 0
-
+
# Transaction Management
def prepareQueueMessage(self, activity_tool, m):
# Called to prepare transaction commit for queued messages
Modified: erp5/trunk/products/CMFActivity/Activity/RAMDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/RAMDict.py?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/RAMDict.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/RAMDict.py Wed Apr 11 03:21:56 2007
@@ -27,8 +27,8 @@
##############################################################################
from Products.CMFActivity.ActivityTool import registerActivity
+from Products.CMFActivity.Errors import ActivityFlushError
from Queue import Queue, VALID
-from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from zLOG import LOG
Modified: erp5/trunk/products/CMFActivity/Activity/RAMQueue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/RAMQueue.py?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/RAMQueue.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/RAMQueue.py Wed Apr 11 03:21:56 2007
@@ -28,7 +28,6 @@
from Products.CMFActivity.ActivityTool import registerActivity
from Queue import Queue, VALID
-from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
try:
from transaction import get as get_transaction
Modified: erp5/trunk/products/CMFActivity/Activity/SQLDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLDict.py?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLDict.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLDict.py Wed Apr 11 03:21:56 2007
@@ -26,17 +26,15 @@
#
##############################################################################
-import random
from DateTime import DateTime
from Products.CMFActivity.ActivityTool import registerActivity
-from Queue import VALID, INVALID_ORDER, INVALID_PATH, EXCEPTION, MAX_PROCESSING_TIME, VALIDATION_ERROR_DELAY
+from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY
from RAMDict import RAMDict
-from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
+from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from Products.CMFActivity.Errors import ActivityFlushError
from ZODB.POSException import ConflictError
import sys
-import sha
-from types import ClassType, StringType, ListType, TupleType
+from types import ClassType
try:
from transaction import get as get_transaction
@@ -141,50 +139,22 @@
message_list = activity_buffer.getMessageList(self)
return [m for m in message_list if m.is_registered]
- def getOrderValidationText(self, message):
- # Return an identifier of validators related to ordering.
- order_validation_item_list = []
- key_list = message.activity_kw.keys()
- key_list.sort()
- for key in key_list:
- method_id = "_validate_%s" % key
- if hasattr(self, method_id):
- order_validation_item_list.append((key, message.activity_kw[key]))
- if len(order_validation_item_list) == 0:
- # When no order validation argument is specified, skip the computation
- # of the checksum for speed. Here, 'none' is used, because this never be
- # identical to SHA1 hexdigest (which is always 40 characters), and 'none'
- # is true in Python. This is important, because dtml-if assumes that an empty
- # string is false, so we must use a non-empty string for this.
- return 'none'
- return sha.new(repr(order_validation_item_list)).hexdigest()
-
def validateMessage(self, activity_tool, message, uid_list, priority, processing_node):
- validation_state = message.validate(self, activity_tool)
+ validation_state = message.validate(self, activity_tool, check_order_validation=0)
if validation_state is not VALID:
- if validation_state in (EXCEPTION, INVALID_PATH):
- # There is a serious validation error - we must lower priority
- if priority > MAX_PRIORITY:
- # This is an error
- if len(uid_list) > 0:
- #LOG('SQLDict', 0, 'error uid_list = %r' % (uid_list,))
- activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = VALIDATE_ERROR_STATE)
- # Assign message back to 'error' state
- #m.notifyUser(activity_tool) # Notify Error
- get_transaction().commit() # and commit
- else:
- #LOG('SQLDict', 0, 'lower priority uid_list = %r' % (uid_list,))
- # Lower priority
- if len(uid_list) > 0: # Add some delay before new processing
- activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
- priority = priority + 1, retry = 1)
- get_transaction().commit() # Release locks before starting a potentially long calculation
+ # There is a serious validation error - we must lower priority
+ if priority > MAX_PRIORITY:
+ # This is an error
+ if len(uid_list) > 0:
+ activity_tool.SQLDict_assignMessage(uid=uid_list, processing_node=VALIDATE_ERROR_STATE)
+ # Assign message back to 'error' state
+ #m.notifyUser(activity_tool) # Notify Error
+ get_transaction().commit() # and commit
else:
- # We do not lower priority for INVALID_ORDER errors but we do postpone execution
- #order_validation_text = self.getOrderValidationText(message)
- activity_tool.SQLDict_setPriority(uid = uid_list,
- delay = VALIDATION_ERROR_DELAY,
- retry = 1)
+ # Lower priority
+ if len(uid_list) > 0: # Add some delay before new processing
+ activity_tool.SQLDict_setPriority(uid=uid_list, delay=VALIDATION_ERROR_DELAY,
+ priority=priority + 1, retry=1)
get_transaction().commit() # Release locks before starting a potentially long calculation
return 0
return 1
@@ -196,41 +166,29 @@
return 1
now_date = DateTime()
- priority = random.choice(priority_weight)
- # Try to find a message at given priority level which is scheduled for now
- result = readMessage(processing_node=processing_node, priority=priority,
- to_date=now_date)
- if len(result) == 0:
- # If empty, take any message which is scheduled for now
- priority = None
- result = readMessage(processing_node=processing_node, priority=priority, to_date=now_date)
- if len(result) == 0:
- # If the result is still empty, shift the dates so that SQLDict can dispatch pending active
- # objects quickly.
- self.timeShift(activity_tool, VALIDATION_ERROR_DELAY, processing_node,retry=1)
- else:
- #LOG('SQLDict dequeueMessage', 100, 'result = %r' % (list(result)))
+ result = readMessage(processing_node=processing_node, to_date=now_date)
+ if len(result) > 0:
line = result[0]
path = line.path
method_id = line.method_id
order_validation_text = line.order_validation_text
- uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
- processing_node = None, to_date = now_date,
- order_validation_text = order_validation_text)
+ uid_list = activity_tool.SQLDict_readUidList(path=path, method_id=method_id,
+ processing_node=None, to_date=now_date,
+ order_validation_text=order_validation_text)
uid_list = [x.uid for x in uid_list]
uid_list_list = [uid_list]
priority_list = [line.priority]
# Make sure message can not be processed anylonger
if len(uid_list) > 0:
# Set selected messages to processing
- activity_tool.SQLDict_processMessage(uid = uid_list)
+ activity_tool.SQLDict_processMessage(uid=uid_list)
get_transaction().commit() # Release locks before starting a potentially long calculation
# This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state
# At this point, messages are marked as processed. So catch any kind of exception to make sure
# that they are unmarked on error.
try:
- m = self.loadMessage(line.message, uid = line.uid)
+ m = self.loadMessage(line.message, uid=line.uid)
message_list = [m]
# Validate message (make sure object exists, priority OK, etc.)
if not self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
@@ -244,13 +202,13 @@
else:
count = 1
- group_method = activity_tool.restrictedTraverse(group_method_id)
+ group_method = activity_tool.getPortalObject().restrictedTraverse(group_method_id)
if count < MAX_GROUPED_OBJECTS:
# Retrieve objects which have the same group method.
- result = readMessage(processing_node = processing_node, priority = priority,
- to_date = now_date, group_method_id = group_method_id,
- order_validation_text = order_validation_text)
+ result = readMessage(processing_node=processing_node,
+ to_date=now_date, group_method_id=group_method_id,
+ order_validation_text=order_validation_text)
#LOG('SQLDict dequeueMessage', 0, 'result = %d' % (len(result)))
path_and_method_id_dict = {}
for line in result:
@@ -263,19 +221,20 @@
continue
path_and_method_id_dict[key] = 1
- uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
- processing_node = None, to_date = now_date,
- order_validation_text = order_validation_text)
+ uid_list = activity_tool.SQLDict_readUidList(path=path, method_id=method_id,
+ processing_node=None,
+ to_date=now_date,
+ order_validation_text=order_validation_text)
uid_list = [x.uid for x in uid_list]
if len(uid_list) > 0:
# Set selected messages to processing
- activity_tool.SQLDict_processMessage(uid = uid_list)
+ activity_tool.SQLDict_processMessage(uid=uid_list)
get_transaction().commit() # Release locks before starting a potentially long calculation
# Save this newly marked uids as soon as possible.
uid_list_list.append(uid_list)
- m = self.loadMessage(line.message, uid = line.uid)
+ m = self.loadMessage(line.message, uid=line.uid)
if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
if m.hasExpandMethod():
count += len(m.getObjectList(activity_tool))
@@ -321,7 +280,8 @@
get_transaction().abort()
except:
# Unfortunately, database adapters may raise an exception against abort.
- LOG('SQLDict', WARNING, 'abort failed, thus some objects may be modified accidentally')
+ LOG('SQLDict', WARNING,
+ 'abort failed, thus some objects may be modified accidentally')
pass
# An exception happens at somewhere else but invoke or invokeGroup, so messages
@@ -330,10 +290,11 @@
for uid_list in uid_list_list:
if len(uid_list):
# This only sets processing to zero.
- activity_tool.SQLDict_setPriority(uid = uid_list)
+ activity_tool.SQLDict_setPriority(uid=uid_list)
get_transaction().commit()
except:
- LOG('SQLDict', ERROR, 'SQLDict.dequeueMessage raised, and cannot even set processing to zero due to an exception',
+ LOG('SQLDict', ERROR,
+ 'SQLDict.dequeueMessage raised, and cannot even set processing to zero due to an exception',
error=sys.exc_info())
raise
return 0
@@ -345,8 +306,8 @@
priority = priority_list[i]
if m.is_executed:
if len(uid_list) > 0:
- activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it
- get_transaction().commit() # If successful, commit
+ activity_tool.SQLDict_delMessage(uid=uid_list) # Delete it
+ get_transaction().commit() # If successful, commit
if m.active_process:
active_process = activity_tool.unrestrictedTraverse(m.active_process)
if not active_process.hasActivity():
@@ -355,24 +316,25 @@
else:
if type(m.exc_type) is ClassType and issubclass(m.exc_type, ConflictError):
# If this is a conflict error, do not lower the priority but only delay.
- activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
- retry = 1)
+ activity_tool.SQLDict_setPriority(uid=uid_list, delay=VALIDATION_ERROR_DELAY)
get_transaction().commit() # Release locks before starting a potentially long calculation
elif priority > MAX_PRIORITY:
# This is an error
if len(uid_list) > 0:
- activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE)
+ activity_tool.SQLDict_assignMessage(uid=uid_list,
+ processing_node=INVOKE_ERROR_STATE)
# Assign message back to 'error' state
m.notifyUser(activity_tool) # Notify Error
get_transaction().commit() # and commit
else:
# Lower priority
if len(uid_list) > 0:
- activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
- priority = priority + 1, retry = 1)
- get_transaction().commit() # Release locks before starting a potentially long calculation
+ activity_tool.SQLDict_setPriority(uid=uid_list, delay=VALIDATION_ERROR_DELAY,
+ priority=priority + 1)
+ get_transaction().commit() # Release locks before starting a potentially long calculation
except:
- LOG('SQLDict', ERROR, 'SQLDict.dequeueMessage raised an exception during checking for the results of processed messages',
+ LOG('SQLDict', ERROR,
+ 'SQLDict.dequeueMessage raised an exception during checking for the results of processed messages',
error=sys.exc_info())
raise
@@ -411,7 +373,7 @@
if readMessageList is not None:
# Parse each message in registered
for m in activity_tool.getRegisteredMessageList(self):
- if list(m.object_path) == list(object_path) and (method_id is None or method_id == m.method_id):
+ if m.object_path == object_path and (method_id is None or method_id == m.method_id):
#if not method_dict.has_key(method_id or m.method_id):
if not method_dict.has_key(m.method_id):
method_dict[m.method_id] = 1 # Prevents calling invoke twice
@@ -469,13 +431,13 @@
if len(uid_list)>0:
activity_tool.SQLDict_delMessage(uid = [x.uid for x in uid_list])
- def getMessageList(self, activity_tool, processing_node=None,include_processing=0,**kw):
+ def getMessageList(self, activity_tool, processing_node=None, include_processing=0, **kw):
# YO: reading all lines might cause a deadlock
message_list = []
readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
if readMessageList is not None:
result = readMessageList(path=None, method_id=None, processing_node=None,
- to_processing_date=None,include_processing=include_processing)
+ to_processing_date=None, include_processing=include_processing)
for line in result:
m = self.loadMessage(line.message, uid = line.uid)
m.processing_node = line.processing_node
@@ -496,145 +458,155 @@
return message_list
def distribute(self, activity_tool, node_count):
- processing_node = 1
readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
if readMessageList is not None:
now_date = DateTime()
- if (now_date - self.max_processing_date) > MAX_PROCESSING_TIME:
- # Sticky processing messages should be set back to non processing
- max_processing_date = now_date - MAX_PROCESSING_TIME
- self.max_processing_date = now_date
- else:
- max_processing_date = None
- result = readMessageList(path=None, method_id=None, processing_node = -1,
- to_processing_date = max_processing_date,
- include_processing=0) # Only assign non assigned messages
- get_transaction().commit() # Release locks before starting a potentially long calculation
+ result = readMessageList(path=None, method_id=None, processing_node=-1,
+ to_date=now_date, include_processing=0)
+ get_transaction().commit()
+
+ validation_text_dict = {'none': 1}
+ message_dict = {}
+ for line in result:
+ message = self.loadMessage(line.message, uid = line.uid,
+ order_validation_text = line.order_validation_text)
+ self.getExecutableMessageList(activity_tool, message, message_dict,
+ validation_text_dict)
+
+ # XXX probably this below can be optimized by assigning multiple messages at a time.
path_dict = {}
- for line in result:
- path = line.path
- broadcast = line.broadcast
+ assignMessage = activity_tool.SQLDict_assignMessage
+ processing_node = 1
+ for message in message_dict.itervalues():
+ path = '/'.join(message.object_path)
+ broadcast = message.activity_kw.get('broadcast', 0)
if broadcast:
# Broadcast messages must be distributed into all nodes.
- uid = line.uid
- activity_tool.SQLDict_assignMessage(processing_node=1, uid=[uid])
+ uid = message.uid
+ assignMessage(processing_node=1, uid=[uid])
if node_count > 1:
- uid_list = activity_tool.getPortalObject().portal_ids.generateNewLengthIdList(id_group='portal_activity', id_count=node_count - 1)
- for node in range(2, node_count+1):
- activity_tool.SQLDict_writeMessage( uid = uid_list.pop(),
- path = path,
- method_id = line.method_id,
- priority = line.priority,
- broadcast = 1,
- processing_node = node,
- message = line.message,
- date = line.date)
- elif not path_dict.has_key(path):
- # Only assign once (it would be different for a queue)
- path_dict[path] = 1
- activity_tool.SQLDict_assignMessage(path=path, processing_node=processing_node, uid=None, broadcast=0)
+ id_tool = activity_tool.getPortalObject().portal_ids
+ uid_list = id_tool.generateNewLengthIdList(id_group='portal_activity',
+ id_count=node_count - 1)
+ path_list = [path] * (node_count - 1)
+ method_id_list = [message.method_id] * (node_count - 1)
+ priority_list = [message.activity_kw.get('priority', 1)] * (node_count - 1)
+ processing_node_list = range(2, node_count + 1)
+ broadcast_list = [1] * (node_count - 1)
+ message_list = [self.dumpMessage(message)] * (node_count - 1)
+ date_list = [message.activity_kw.get('at_date', now_date)] * (node_count - 1)
+ group_method_id_list = [message.activity_kw.get('group_method_id', '')] * (node_count - 1)
+ tag_list = [message.activity_kw.get('tag', '')] * (node_count - 1)
+ order_validation_text_list = [message.order_validation_text] * (node_count - 1)
+ activity_tool.SQLDict_writeMessageList(uid_list=uid_list,
+ path_list=path_list,
+ method_id_list=method_id_list,
+ priority_list=priority_list,
+ broadcast_list=broadcast_list,
+ processing_node_list=processing_node_list,
+ message_list=message_list,
+ date_list=date_list,
+ group_method_id_list=group_method_id_list,
+ tag_list=tag_list,
+ order_validation_text_list=order_validation_text_list)
+ get_transaction().commit()
+ else:
+ # Select a processing node. If the same path appears again, dispatch the message to
+ # the same node, so that object caching is more efficient. Otherwise, apply a round
+ # robin scheduling.
+ node = path_dict.get(path)
+ if node is None:
+ node = processing_node
+ path_dict[path] = node
+ processing_node += 1
+ if processing_node > node_count:
+ processing_node = 1
+
+ assignMessage(processing_node=node, uid=[message.uid], broadcast=0)
get_transaction().commit() # Release locks immediately to allow processing of messages
- processing_node = processing_node + 1
- if processing_node > node_count:
- processing_node = 1 # Round robin
# Validation private methods
+ def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None):
+ if isinstance(method_id, str):
+ method_id = [method_id]
+ if isinstance(path, str):
+ path = [path]
+ if isinstance(tag, str):
+ tag = [tag]
+
+ if method_id or message_uid or path or tag:
+ validateMessageList = activity_tool.SQLDict_validateMessageList
+ result = validateMessageList(method_id=method_id,
+ message_uid=message_uid,
+ path=path,
+ tag=tag)
+ message_list = []
+ for line in result:
+ m = self.loadMessage(line.message,
+ uid=line.uid,
+ order_validation_text=line.order_validation_text,
+ date=line.date,
+ processing_node=line.processing_node)
+ message_list.append(m)
+ return message_list
+ else:
+ return []
+
def _validate_after_method_id(self, activity_tool, message, value):
- # Count number of occurances of method_id
- if type(value) is StringType:
- value = [value]
- if len(value)>0: # if empty list provided, the message is valid
- result = activity_tool.SQLDict_validateMessageList(method_id=value, message_uid=None, path=None)
- if result[0].uid_count > 0:
- return INVALID_ORDER
- return VALID
+ return self._validate(activity_tool, method_id=value)
def _validate_after_path(self, activity_tool, message, value):
- # Count number of occurances of path
- if type(value) is StringType:
- value = [value]
- if len(value)>0: # if empty list provided, the message is valid
- result = activity_tool.SQLDict_validateMessageList(method_id=None, message_uid=None, path=value)
- if result[0].uid_count > 0:
- return INVALID_ORDER
- return VALID
+ return self._validate(activity_tool, path=value)
def _validate_after_message_uid(self, activity_tool, message, value):
- # Count number of occurances of message_uid
- result = activity_tool.SQLDict_validateMessageList(method_id=None, message_uid=value, path=None)
- if result[0].uid_count > 0:
- return INVALID_ORDER
- return VALID
+ return self._validate(activity_tool, message_uid=value)
def _validate_after_path_and_method_id(self, activity_tool, message, value):
- # Count number of occurances of path and method_id
- if (type(value) != TupleType and type(value) != ListType) or len(value)<2:
- LOG('CMFActivity WARNING :', 0, 'unable to recognize value for after_path_and_method_id : %s' % repr(value))
- return VALID
- path = value[0]
- method = value[1]
- if type(path) is StringType:
+ if not isinstance(value, (tuple, list)) or len(value) < 2:
+ LOG('CMFActivity', WARNING,
+ 'unable to recognize value for after_path_and_method_id: %r' % (value,))
+ return []
+ return self._validate(activity_tool, path=value[0], method_id=value[1])
+
+ def _validate_after_tag(self, activity_tool, message, value):
+ return self._validate(activity_tool, tag=value)
+
+ def _validate_after_tag_and_method_id(self, activity_tool, message, value):
+ # Count number of occurances of tag and method_id
+ if not isinstance(value, (tuple, list)) or len(value) < 2:
+ LOG('CMFActivity', WARNING,
+ 'unable to recognize value for after_tag_and_method_id: %r' % (value,))
+ return []
+ return self._validate(activity_tool, tag=value[0], method_id=value[1])
+
+ def countMessage(self, activity_tool, tag=None, path=None,
+ method_id=None, message_uid=None, **kw):
+ """Return the number of messages which match the given parameters.
+ """
+ if isinstance(tag, str):
+ tag = [tag]
+ if isinstance(path, str):
path = [path]
- if type(method) is StringType:
- method = [method]
- result = activity_tool.SQLDict_validateMessageList(method_id=method, message_uid=None, path=path)
- if result[0].uid_count > 0:
- return INVALID_ORDER
- return VALID
-
- def _validate_after_tag(self, activity_tool, message, value):
- # Count number of occurances of tag
- if self.countMessageWithTag(activity_tool, value) > 0:
- return INVALID_ORDER
- return VALID
-
- def countMessage(self, activity_tool, tag=None,path=None,
- method_id=None,message_uid=None,**kw):
- """
- Return the number of message which match the given parameter.
- """
- if isinstance(tag, StringType):
- tag = [tag]
- if isinstance(path, StringType):
- path = [path]
- if isinstance(message_uid, (int,long)):
- message_uid = [message_uid]
- if isinstance(method_id, StringType):
+ elif isinstance(method_id, str):
method_id = [method_id]
result = activity_tool.SQLDict_validateMessageList(method_id=method_id,
path=path,
message_uid=message_uid,
- tag=tag)
+ tag=tag,
+ count=1)
return result[0].uid_count
def countMessageWithTag(self, activity_tool, value):
+ """Return the number of messages which match the given tag.
"""
- Return the number of message which match the given tag.
- """
- return self.countMessage(activity_tool,tag=value)
-
- def _validate_after_tag_and_method_id(self, activity_tool, message, value):
- # Count number of occurances of tag and method_id
- if (type(value) != TupleType and type(value) != ListType) or len(value)<2:
- LOG('CMFActivity WARNING :', 0, 'unable to recognize value for after_tag_and_method_id : %s' % repr(value))
- return VALID
- tag = value[0]
- method = value[1]
- if type(tag) is StringType:
- tag = [tag]
- if type(method) is StringType:
- method = [method]
- result = activity_tool.SQLDict_validateMessageList(method_id=method, message_uid=None, tag=tag)
- if result[0].uid_count > 0:
- return INVALID_ORDER
- return VALID
+ return self.countMessage(activity_tool, tag=value)
# Required for tests (time shift)
- def timeShift(self, activity_tool, delay, processing_node=None,retry=None):
+ def timeShift(self, activity_tool, delay, processing_node=None, retry=None):
"""
To simulate timeShift, we simply substract delay from
all dates in SQLDict message table
"""
- activity_tool.SQLDict_timeShift(delay = delay, processing_node = processing_node,retry=retry)
+ activity_tool.SQLDict_timeShift(delay=delay, processing_node=processing_node,retry=retry)
registerActivity(SQLDict)
Modified: erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLQueue.py?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLQueue.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLQueue.py Wed Apr 11 03:21:56 2007
@@ -26,15 +26,14 @@
#
##############################################################################
-import random
from Products.CMFActivity.ActivityTool import registerActivity
from RAMQueue import RAMQueue
from DateTime import DateTime
-from Queue import VALID, INVALID_ORDER, INVALID_PATH, EXCEPTION, MAX_PROCESSING_TIME, VALIDATION_ERROR_DELAY
-from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
+from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY
+from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from Products.CMFActivity.Errors import ActivityFlushError
from ZODB.POSException import ConflictError
-from types import StringType, ClassType
+from types import ClassType
import sys
try:
@@ -61,8 +60,9 @@
"""
def prepareQueueMessage(self, activity_tool, m):
if m.is_registered:
- #import pdb; pdb.set_trace()
- activity_tool.SQLQueue_writeMessage(uid = activity_tool.getPortalObject().portal_ids.generateNewLengthId(id_group='portal_activity_queue'),
+ id_tool = activity_tool.getPortalObject().portal_ids
+ uid = id_tool.generateNewLengthId(id_group='portal_activity_queue')
+ activity_tool.SQLQueue_writeMessage(uid = uid,
path = '/'.join(m.object_path) ,
method_id = m.method_id,
priority = m.activity_kw.get('priority', 1),
@@ -84,13 +84,7 @@
now_date = DateTime()
# Next processing date in case of error
next_processing_date = now_date + float(VALIDATION_ERROR_DELAY)/86400
- priority = random.choice(priority_weight)
- # Try to find a message at given priority level
- result = readMessage(processing_node=processing_node, priority=priority,
- to_date=now_date)
- if len(result) == 0:
- # If empty, take any message
- result = readMessage(processing_node=processing_node, priority=None,to_date=now_date)
+ result = readMessage(processing_node=processing_node, to_date=now_date)
if len(result) > 0:
line = result[0]
path = line.path
@@ -103,23 +97,17 @@
try:
m = self.loadMessage(line.message)
# Make sure object exists
- validation_state = m.validate(self, activity_tool)
+ validation_state = m.validate(self, activity_tool, check_order_validation=0)
if validation_state is not VALID:
- if validation_state in (EXCEPTION, INVALID_PATH):
- if line.priority > MAX_PRIORITY:
- # This is an error.
- # Assign message back to 'error' state.
- activity_tool.SQLQueue_assignMessage(uid=line.uid,
- processing_node = VALIDATE_ERROR_STATE)
- get_transaction().commit() # and commit
- else:
- # Lower priority
- activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1)
- get_transaction().commit() # Release locks before starting a potentially long calculation
+ if line.priority > MAX_PRIORITY:
+ # This is an error.
+ # Assign message back to 'error' state.
+ activity_tool.SQLQueue_assignMessage(uid=line.uid,
+ processing_node=VALIDATE_ERROR_STATE)
+ get_transaction().commit() # and commit
else:
- # We do not lower priority for INVALID_ORDER errors but we do postpone execution
- activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date,
- priority = line.priority)
+ # Lower priority
+ activity_tool.SQLQueue_setPriority(uid=line.uid, priority=line.priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
return 0
@@ -139,8 +127,8 @@
# An exception happens at somewhere else but invoke, so messages
# themselves should not be delayed.
try:
- activity_tool.SQLQueue_setPriority(uid = line.uid, date = line.date,
- priority = line.priority)
+ activity_tool.SQLQueue_setPriority(uid=line.uid, date=line.date,
+ priority=line.priority)
get_transaction().commit()
except:
LOG('SQLQueue', ERROR, 'SQLQueue.dequeueMessage raised, and cannot even set processing to zero due to an exception',
@@ -159,25 +147,26 @@
# Unfortunately, database adapters may raise an exception against abort.
LOG('SQLQueue', WARNING, 'abort failed, thus some objects may be modified accidentally')
pass
-
+
if type(m.exc_type) is ClassType \
and issubclass(m.exc_type, ConflictError):
- activity_tool.SQLQueue_setPriority(uid = line.uid,
- date = next_processing_date,
- priority = line.priority)
+ activity_tool.SQLQueue_setPriority(uid=line.uid,
+ date=next_processing_date,
+ priority=line.priority)
elif line.priority > MAX_PRIORITY:
# This is an error
- activity_tool.SQLQueue_assignMessage(uid = line.uid,
- processing_node = INVOKE_ERROR_STATE)
+ activity_tool.SQLQueue_assignMessage(uid=line.uid,
+ processing_node=INVOKE_ERROR_STATE)
# Assign message back to 'error' state
m.notifyUser(activity_tool) # Notify Error
else:
# Lower priority
- activity_tool.SQLQueue_setPriority(uid=line.uid, date = next_processing_date,
- priority = line.priority + 1)
+ activity_tool.SQLQueue_setPriority(uid=line.uid, date=next_processing_date,
+ priority=line.priority + 1)
get_transaction().commit()
except:
- LOG('SQLQueue', ERROR, 'SQLQueue.dequeueMessage raised an exception during checking for the results of processed messages',
+ LOG('SQLQueue', ERROR,
+ 'SQLQueue.dequeueMessage raised an exception during checking for the results of processed messages',
error=sys.exc_info())
raise
return 0
@@ -215,32 +204,45 @@
# Parse each message in registered
for m in activity_tool.getRegisteredMessageList(self):
if object_path == m.object_path and (method_id is None or method_id == m.method_id):
- if invoke: activity_tool.invoke(m)
- activity_tool.unregisterMessage(self, m)
- # Parse each message in SQL queue
- #LOG('Flush', 0, str((path, invoke, method_id)))
- result = readMessageList(path=path, method_id=method_id,processing_node=None)
- #LOG('Flush', 0, str(len(result)))
- method_dict = {}
- for line in result:
- path = line.path
- method_id = line.method_id
- if not method_dict.has_key(method_id):
- # Only invoke once (it would be different for a queue)
- method_dict[method_id] = 1
- m = self.loadMessage(line.message, uid = line.uid)
if invoke:
# First Validate
- if m.validate(self, activity_tool) is VALID:
+ validate_value = m.validate(self, activity_tool)
+ if validate_value is VALID:
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
if not m.is_executed: # Make sure message could be invoked
# The message no longer exists
raise ActivityFlushError, (
- 'Could not evaluate %s on %s' % (method_id , path))
- else:
+ 'Could not evaluate %s on %s' % (m.method_id , path))
+ elif validate_value is INVALID_PATH:
# The message no longer exists
raise ActivityFlushError, (
'The document %s does not exist' % path)
+ else:
+ raise ActivityFlushError, (
+ 'Could not validate %s on %s' % (m.method_id , path))
+ activity_tool.unregisterMessage(self, m)
+ # Parse each message in SQL queue
+ result = readMessageList(path=path, method_id=method_id, processing_node=None)
+ for line in result:
+ path = line.path
+ method_id = line.method_id
+ m = self.loadMessage(line.message, uid = line.uid)
+ if invoke:
+ # First Validate
+ validate_value = m.validate(self, activity_tool)
+ if validate_value is VALID:
+ activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
+ if not m.is_executed: # Make sure message could be invoked
+ # The message no longer exists
+ raise ActivityFlushError, (
+ 'Could not evaluate %s on %s' % (method_id , path))
+ elif validate_value is INVALID_PATH:
+ # The message no longer exists
+ raise ActivityFlushError, (
+ 'The document %s does not exist' % path)
+ else:
+ raise ActivityFlushError, (
+ 'Could not validate %s on %s' % (m.method_id , path))
if len(result):
activity_tool.SQLQueue_delMessage(uid = [line.uid for line in result])
@@ -265,31 +267,27 @@
message_list.append(m)
return message_list
- def countMessage(self, activity_tool, tag=None,path=None,
- method_id=None,message_uid=None,**kw):
- """
- Return the number of message which match the given parameter.
- """
- if isinstance(tag, StringType):
+ def countMessage(self, activity_tool, tag=None, path=None,
+ method_id=None, message_uid=None, **kw):
+ """Return the number of messages which match the given parameters.
+ """
+ if isinstance(tag, str):
tag = [tag]
- if isinstance(path, StringType):
+ if isinstance(path, str):
path = [path]
- if isinstance(message_uid, (int,long)):
- message_uid = [message_uid]
- if isinstance(method_id, StringType):
+ if isinstance(method_id, str):
method_id = [method_id]
result = activity_tool.SQLQueue_validateMessageList(method_id=method_id,
- path=path,
- message_uid=message_uid,
- tag=tag)
+ path=path,
+ message_uid=message_uid,
+ tag=tag,
+ count=1)
return result[0].uid_count
def countMessageWithTag(self, activity_tool, value):
- """
- Return the number of message which match the given tag.
- """
- return self.countMessage(activity_tool,tag=value)
-
+ """Return the number of messages which match the given tag.
+ """
+ return self.countMessage(activity_tool, tag=value)
def dumpMessageList(self, activity_tool):
# Dump all messages in the table.
@@ -306,105 +304,115 @@
processing_node = 1
readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None)
if readMessageList is not None:
- result = readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages
- #LOG('distribute count',0,str(len(result)) )
- #LOG('distribute count',0,str(map(lambda x:x.uid, result)))
- #get_transaction().commit() # Release locks before starting a potentially long calculation
- result = list(result)[0:100]
+ now_date = DateTime()
+ result = readMessageList(path=None, method_id=None,
+ processing_node=-1, to_date=now_date)
+ get_transaction().commit()
+
+ validation_text_dict = {'none': 1}
+ message_dict = {}
for line in result:
- broadcast = line.broadcast
- uid = line.uid
+ message = self.loadMessage(line.message, uid=line.uid)
+ message.order_validation_text = self.getOrderValidationText(message)
+ self.getExecutableMessageList(activity_tool, message, message_dict,
+ validation_text_dict)
+
+ # XXX probably this below can be optimized by assigning multiple messages at a time.
+ path_dict = {}
+ assignMessage = activity_tool.SQLQueue_assignMessage
+ processing_node = 1
+ for message in message_dict.itervalues():
+ path = '/'.join(message.object_path)
+ broadcast = message.activity_kw.get('broadcast', 0)
if broadcast:
# Broadcast messages must be distributed into all nodes.
- activity_tool.SQLQueue_assignMessage(processing_node=1, uid=uid)
+ assignMessage(processing_node=1, uid=message.uid)
if node_count > 1:
- uid_list = activity_tool.getPortalObject().portal_ids.generateNewLengthIdList(id_group='portal_activity_queue', id_count=node_count - 1)
- for node in range(2, node_count+1):
- activity_tool.SQLQueue_writeMessage(uid = uid_list.pop(),
- path = line.path,
- method_id = line.method_id,
- priority = line.priority,
- broadcast = 1,
- processing_node = node,
- message = line.message,
- date = line.date)
+ id_tool = activity_tool.getPortalObject().portal_ids
+ uid_list = id_tool.generateNewLengthIdList(id_group='portal_activity_queue',
+ id_count=node_count - 1)
+ priority = message.activity_kw.get('priority', 1)
+ dumped_message = self.dumpMessage(message)
+ date = message.activity_kw.get('at_date', now_date)
+ tag = message.activity_kw.get('tag', '')
+ for node in xrange(2, node_count+1):
+ activity_tool.SQLQueue_writeMessage(uid=uid_list.pop(),
+ path=path,
+ method_id=message.method_id,
+ priority=priority,
+ broadcast=1,
+ processing_node=node,
+ message=dumped_message,
+ date=date,
+ tag=tag)
+ get_transaction().commit()
else:
- #LOG("distribute", 0, "assign %s" % uid)
- activity_tool.SQLQueue_assignMessage(uid=uid, processing_node=processing_node)
- #get_transaction().commit() # Release locks immediately to allow processing of messages
- processing_node = processing_node + 1
- if processing_node > node_count:
- processing_node = 1 # Round robin
+ # Select a processing node. If the same path appears again, dispatch the message to
+ # the same node, so that object caching is more efficient. Otherwise, apply a round
+ # robin scheduling.
+ node = path_dict.get(path)
+ if node is None:
+ node = processing_node
+ path_dict[path] = node
+ processing_node += 1
+ if processing_node > node_count:
+ processing_node = 1
+
+ assignMessage(processing_node=node, uid=message.uid, broadcast=0)
+ get_transaction().commit() # Release locks immediately to allow processing of messages
# Validation private methods
+ def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None):
+ if isinstance(method_id, str):
+ method_id = [method_id]
+ if isinstance(path, str):
+ path = [path]
+ if isinstance(tag, str):
+ tag = [tag]
+
+ if method_id or message_uid or path or tag:
+ validateMessageList = activity_tool.SQLQueue_validateMessageList
+ result = validateMessageList(method_id=method_id,
+ message_uid=message_uid,
+ path=path,
+ tag=tag)
+ message_list = []
+ for line in result:
+ m = self.loadMessage(line.message,
+ uid=line.uid,
+ date=line.date,
+ processing_node=line.processing_node)
+ m.order_validation_text = self.getOrderValidationText(m)
+ message_list.append(m)
+ return message_list
+ else:
+ return []
+
def _validate_after_method_id(self, activity_tool, message, value):
- # Count number of occurances of method_id
- #get_transaction().commit()
- if type(value) == type(''):
- value = [value]
- result = activity_tool.SQLQueue_validateMessageList(method_id=value, message_uid=None, path=None)
- #LOG('SQLQueue._validate_after_method_id, method_id',0,value)
- #LOG('SQLQueue._validate_after_method_id, result[0].uid_count',0,result[0].uid_count)
- if result[0].uid_count > 0:
- return INVALID_ORDER
- return VALID
+ return self._validate(activity_tool, method_id=value)
def _validate_after_path(self, activity_tool, message, value):
- # Count number of occurances of path
- if type(value) == type(''):
- value = [value]
- result = activity_tool.SQLQueue_validateMessageList(method_id=None, message_uid=None, path=value)
- if result[0].uid_count > 0:
- return INVALID_ORDER
- return VALID
+ return self._validate(activity_tool, path=value)
def _validate_after_message_uid(self, activity_tool, message, value):
- # Count number of occurances of message_uid
- result = activity_tool.SQLQueue_validateMessageList(method_id=None, message_uid=value, path=None)
- if result[0].uid_count > 0:
- return INVALID_ORDER
- return VALID
+ return self._validate(activity_tool, message_uid=value)
def _validate_after_path_and_method_id(self, activity_tool, message, value):
- # Count number of occurances of method_id and path
- if (type(value) != type( (0,) ) and type(value) != type ([])) or len(value)<2:
- LOG('CMFActivity WARNING :', 0, 'unable to recognize value for after_path_and_method : %s' % repr(value))
- return VALID
- path = value[0]
- method = value[1]
- if type(path) == type(''):
- path = [path]
- if type(method) == type(''):
- method = [method]
- result = activity_tool.SQLQueue_validateMessageList(method_id=method, message_uid=None, path=path)
- if result[0].uid_count > 0:
- return INVALID_ORDER
- return VALID
+ if not isinstance(value, (tuple, list)) or len(value) < 2:
+ LOG('CMFActivity', WARNING,
+ 'unable to recognize value for after_path_and_method: %r' % (value,))
+ return []
+ return self._validate(activity_tool, path=value[0], method_id=value[1])
def _validate_after_tag(self, activity_tool, message, value):
- # Count number of occurances of tag
- if type(value) == type(''):
- value = [value]
- result = activity_tool.SQLQueue_validateMessageList(method_id=None, message_uid=None, tag=value)
- if result[0].uid_count > 0:
- return INVALID_ORDER
- return VALID
+ return self._validate(activity_tool, tag=value)
def _validate_after_tag_and_method_id(self, activity_tool, message, value):
- # Count number of occurances of tag and method_id
- if (type(value) != type ( (0,) ) and type(value) != type([])) or len(value)<2:
- LOG('CMFActivity WARNING :', 0, 'unable to recognize value for after_tag_and_method_id : %s' % repr(value))
- return VALID
- tag = value[0]
- method = value[1]
- if type(tag) == type(''):
- tag = [tag]
- if type(method) == type(''):
- method = [method]
- result = activity_tool.SQLQueue_validateMessageList(method_id=method, message_uid=None, tag=tag)
- if result[0].uid_count > 0:
- return INVALID_ORDER
- return VALID
+ if not isinstance(value, (tuple, list)) or len(value) < 2:
+ LOG('CMFActivity', WARNING,
+ 'unable to recognize value for after_tag_and_method_id: %r' % (value,))
+ return []
+ return self._validate(activity_tool, tag=value[0], method_id=value[1])
# Required for tests (time shift)
def timeShift(self, activity_tool, delay, processing_node = None):
@@ -412,6 +420,6 @@
To simulate timeShift, we simply substract delay from
all dates in SQLDict message table
"""
- activity_tool.SQLQueue_timeShift(delay = delay, processing_node = processing_node)
+ activity_tool.SQLQueue_timeShift(delay=delay, processing_node=processing_node)
registerActivity(SQLQueue)
Modified: erp5/trunk/products/CMFActivity/ActivityTool.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/ActivityTool.py?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/ActivityTool.py (original)
+++ erp5/trunk/products/CMFActivity/ActivityTool.py Wed Apr 11 03:21:56 2007
@@ -86,14 +86,14 @@
class Message:
"""Activity Message Class.
-
+
Message instances are stored in an activity queue, inside the Activity Tool.
"""
- def __init__(self, object, active_process, activity_kw, method_id, args, kw):
- if type(object) is StringType:
- self.object_path = object.split('/')
+ def __init__(self, obj, active_process, activity_kw, method_id, args, kw):
+ if isinstance(obj, str):
+ self.object_path = obj.split('/')
else:
- self.object_path = object.getPhysicalPath()
+ self.object_path = obj.getPhysicalPath()
if type(active_process) is StringType:
self.active_process = active_process.split('/')
elif active_process is None:
@@ -114,7 +114,7 @@
def getObject(self, activity_tool):
"""return the object referenced in this message."""
return activity_tool.unrestrictedTraverse(self.object_path)
-
+
def getObjectList(self, activity_tool):
"""return the list of object that can be expanded from this message."""
try:
@@ -124,9 +124,9 @@
object_list = getattr(obj, expand_method_id)()
except KeyError:
object_list = [self.getObject(activity_tool)]
-
+
return object_list
-
+
def hasExpandMethod(self):
"""return true if the message has an expand method.
An expand method is used to expand the list of objects and to turn a
@@ -134,7 +134,7 @@
transactions affecting only one object at a time (this can prevent
duplicated method calls)."""
return self.activity_kw.has_key('expand_method_id')
-
+
def changeUser(self, user_name, activity_tool):
"""restore the security context for the calling user."""
uf = activity_tool.getPortalObject().acl_users
@@ -169,7 +169,7 @@
ActiveResult(object_path=object,
method_id=self.method_id,
result=result)) # XXX Allow other method_id in future
-
+
def __call__(self, activity_tool):
try:
obj = self.getObject(activity_tool)
@@ -198,8 +198,13 @@
if hasattr(activity_tool, 'error_log'):
activity_tool.error_log.raising(sys.exc_info())
- def validate(self, activity, activity_tool):
- return activity.validate(activity_tool, self, **self.activity_kw)
+ def validate(self, activity, activity_tool, check_order_validation=1):
+ return activity.validate(activity_tool, self,
+ check_order_validation=check_order_validation,
+ **self.activity_kw)
+
+ def getDependentMessageList(self, activity, activity_tool):
+ return activity.getDependentMessageList(activity_tool, self, **self.activity_kw)
def notifyUser(self, activity_tool, message="Failed Processing Activity"):
"""Notify the user that the activity failed."""
@@ -477,9 +482,9 @@
REQUEST.URL1 +
'/manageLoadBalancing?manage_tabs_message=' +
urllib.quote("Node(s) successfully deleted."))
-
+
def process_timer(self, tick, interval, prev="", next=""):
- """
+ """
Call distribute() if we are the Distributing Node and call tic()
with our node number.
This method is called by TimerService in the interval given
@@ -489,23 +494,23 @@
acquired = timerservice_lock.acquire(0)
if not acquired:
return
-
+
old_sm = getSecurityManager()
try:
# get owner of portal_catalog, so normally we should be able to
# have the permission to invoke all activities
user = self.portal_catalog.getWrappedOwner()
newSecurityManager(self.REQUEST, user)
-
+
currentNode = self.getCurrentNode()
-
+
# only distribute when we are the distributingNode or if it's empty
if (self.distributingNode == self.getCurrentNode()):
self.distribute(len(self._nodes))
elif not self.distributingNode:
self.distribute(1)
-
+
# SkinsTool uses a REQUEST cache to store skin objects, as
# with TimerService we have the same REQUEST over multiple
# portals, we clear this cache to make sure the cache doesn't
@@ -513,13 +518,13 @@
stool = getToolByName(self, 'portal_skins', None)
if stool is not None:
stool.changeSkin(None)
-
+
# call tic for the current processing_node
# the processing_node numbers are the indices of the elements in the node tuple +1
# because processing_node starts form 1
if currentNode in self._nodes:
self.tic(list(self._nodes).index(currentNode)+1)
-
+
elif len(self._nodes) == 0:
self.tic(1)
@@ -566,9 +571,9 @@
# Initialize if needed
if not is_initialized: self.initialize()
-
+
inner_self = aq_inner(self)
-
+
# If this is the first tic after zope is started, reset the processing
# flag for activities of this node
if first_run:
@@ -587,7 +592,7 @@
raise
except:
LOG('CMFActivity:', 100, 'Core call to wakeup failed for activity %s' % activity)
-
+
# Process messages on each queue in round robin
has_awake_activity = 1
while has_awake_activity:
@@ -673,7 +678,7 @@
def invoke(self, message):
message(self)
-
+
def invokeGroup(self, method_id, message_list):
# Invoke a group method.
object_list = []
@@ -770,7 +775,7 @@
LOG('ActivityTool', WARNING,
'Could not call method %s on object %s' % (
m.method_id, m.object_path), error=sys.exc_info())
-
+
def newMessage(self, activity, path, active_process,
activity_kw, method_id, *args, **kw):
# Some Security Cheking should be made here XXX
@@ -826,8 +831,8 @@
LOG('ActivityTool', WARNING,
'could not dump messages from %s' %
(activity,), error=sys.exc_info())
-
- if hasattr(folder, 'SQLDict_createMessageTable'):
+
+ if getattr(folder, 'SQLDict_createMessageTable', None) is not None:
try:
folder.SQLDict_dropMessageTable()
except ConflictError:
@@ -838,7 +843,7 @@
error=sys.exc_info())
folder.SQLDict_createMessageTable()
- if hasattr(folder, 'SQLQueue_createMessageTable'):
+ if getattr(folder, 'SQLQueue_createMessageTable', None) is not None:
try:
folder.SQLQueue_dropMessageTable()
except ConflictError:
@@ -920,16 +925,24 @@
self.immediateReindexObject()
# Active synchronisation methods
+ security.declarePrivate('validateOrder')
def validateOrder(self, message, validator_id, validation_value):
+ message_list = self.getDependentMessageList(message, validator_id, validation_value)
+ return len(message_list) > 0
+
+ security.declarePrivate('getDependentMessageList')
+ def getDependentMessageList(self, message, validator_id, validation_value):
global is_initialized
if not is_initialized: self.initialize()
+ message_list = []
for activity in activity_list:
method_id = "_validate_%s" % validator_id
- if hasattr(activity, method_id):
- if getattr(activity,method_id)(aq_inner(self),
- message, validation_value):
- return 1
- return 0
+ method = getattr(activity, method_id, None)
+ if method is not None:
+ result = method(aq_inner(self), message, validation_value)
+ if result:
+ message_list.extend([(activity, m) for m in result])
+ return message_list
# Required for tests (time shift)
def timeShift(self, delay):
Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_clearProcessingFlag.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_clearProcessingFlag.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_clearProcessingFlag.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_clearProcessingFlag.zsql Wed Apr 11 03:21:56 2007
@@ -2,12 +2,12 @@
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
-max_cache:100
+max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
-<params>processing_node:int=-1</params>
+<params>processing_node</params>
UPDATE
message
SET
Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql Wed Apr 11 03:21:56 2007
@@ -2,7 +2,7 @@
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
-max_cache:100
+max_cache:0
cache_time:0
class_name:
class_file:
Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_processMessage.zsql Wed Apr 11 03:21:56 2007
@@ -13,5 +13,6 @@
processing_date = <dtml-sqlvar "_.DateTime()" type="datetime">,
processing = 1
WHERE
-<dtml-in uid>uid = <dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>
- OR </dtml-if></dtml-in>
+ uid IN (
+<dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
+ )
Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readMessageList.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readMessageList.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readMessageList.zsql Wed Apr 11 03:21:56 2007
@@ -11,19 +11,9 @@
method_id
processing_node
priority
-to_processing_date
-include_processing</params>
-<dtml-if to_processing_date>UPDATE message
-SET
- processing = 0
-WHERE
- processing = 1
-AND
- processing_date < <dtml-sqlvar to_processing_date type="datetime">
-
-<dtml-var "'\0'">
-
-</dtml-if>SELECT * FROM
+include_processing
+to_date</params>
+SELECT * FROM
message
WHERE
1 = 1
@@ -34,5 +24,6 @@
<dtml-if priority> AND priority = <dtml-sqlvar priority type="int"> </dtml-if>
<dtml-if path>AND path = <dtml-sqlvar path type="string"> </dtml-if>
<dtml-if method_id> AND method_id = <dtml-sqlvar method_id type="string"> </dtml-if>
+<dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>
ORDER BY
priority, date, uid
Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readUidList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readUidList.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readUidList.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_readUidList.zsql Wed Apr 11 03:21:56 2007
@@ -1,8 +1,8 @@
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
-max_rows:10000
-max_cache:100
+max_rows:1000
+max_cache:0
cache_time:0
class_name:
class_file:
Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_validateMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_validateMessageList.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_validateMessageList.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_validateMessageList.zsql Wed Apr 11 03:21:56 2007
@@ -11,32 +11,31 @@
message_uid
path
tag
+count
</params>
SELECT
- COUNT(DISTINCT uid) as uid_count
+<dtml-if count>
+ COUNT(*) AS uid_count
+<dtml-else>
+ *
+</dtml-if>
FROM
message
WHERE
processing_node >= -2
<dtml-if method_id>
- AND (
-<dtml-in method_id>
- method_id = <dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else> OR </dtml-if>
-</dtml-in>
+ AND method_id IN (
+<dtml-in method_id><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
)
</dtml-if>
<dtml-if message_uid>AND uid = <dtml-sqlvar message_uid type="int"> </dtml-if>
<dtml-if path>
- AND (
-<dtml-in path>
- path = <dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else> OR </dtml-if>
-</dtml-in>
+ AND path IN (
+<dtml-in path><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
)
</dtml-if>
<dtml-if tag>
- AND (
-<dtml-in tag>
- tag = <dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else> OR </dtml-if>
-</dtml-in>
+ AND tag IN (
+<dtml-in tag><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
)
</dtml-if>
Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql Wed Apr 11 03:21:56 2007
@@ -11,7 +11,7 @@
processing_node
method_id
broadcast
-uid:int=0</params>
+uid</params>
UPDATE message_queue
SET
processing_node=<dtml-sqlvar processing_node type="int">,
Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_clearProcessingFlag.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_clearProcessingFlag.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_clearProcessingFlag.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_clearProcessingFlag.zsql Wed Apr 11 03:21:56 2007
@@ -7,7 +7,7 @@
class_name:
class_file:
</dtml-comment>
-<params>processing_node:int=-1</params>
+<params>processing_node</params>
UPDATE
message_queue
SET
Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql Wed Apr 11 03:21:56 2007
@@ -2,7 +2,7 @@
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
-max_cache:100
+max_cache:0
cache_time:0
class_name:
class_file:
Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessage.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessage.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessage.zsql Wed Apr 11 03:21:56 2007
@@ -19,4 +19,4 @@
<dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>
ORDER BY
- priority, date
+ priority, date, uid
Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessageList.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessageList.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readMessageList.zsql Wed Apr 11 03:21:56 2007
@@ -10,7 +10,8 @@
<params>path
method_id
processing_node
-priority</params>
+priority
+to_date</params>
SELECT * FROM
message_queue
WHERE
@@ -19,3 +20,6 @@
<dtml-if priority>AND priority = <dtml-sqlvar priority type="int"> </dtml-if>
<dtml-if path>AND path = <dtml-sqlvar path type="string"></dtml-if>
<dtml-if method_id>AND method_id = <dtml-sqlvar method_id type="string"></dtml-if>
+<dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>
+ORDER BY
+ priority, date, uid
Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readUidList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readUidList.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readUidList.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_readUidList.zsql Wed Apr 11 03:21:56 2007
@@ -1,8 +1,8 @@
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
-max_rows:10000
-max_cache:100
+max_rows:1000
+max_cache:0
cache_time:0
class_name:
class_file:
@@ -16,5 +16,5 @@
WHERE
processing <> 1
<dtml-if processing_node> AND processing_node = <dtml-sqlvar processing_node type="int"></dtml-if>
-<dtml-if path>AND path = <dtml-sqlvar path type="string"></dtml-if>
-<dtml-if to_date>AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>
+<dtml-if path> AND path = <dtml-sqlvar path type="string"></dtml-if>
+<dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>
Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_setPriority.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_setPriority.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_setPriority.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_setPriority.zsql Wed Apr 11 03:21:56 2007
@@ -11,10 +11,10 @@
priority
date</params>
UPDATE
- message_queue
+ message_queue
SET
- priority = <dtml-sqlvar priority type="int">,
- processing = 0,
- date = <dtml-sqlvar date type="datetime">
+ priority = <dtml-sqlvar priority type="int">,
+ processing = 0,
+ date = <dtml-sqlvar date type="datetime">
WHERE
- uid = <dtml-sqlvar uid type="int">
+ uid = <dtml-sqlvar uid type="int">
Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_validateMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_validateMessageList.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_validateMessageList.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_validateMessageList.zsql Wed Apr 11 03:21:56 2007
@@ -11,32 +11,31 @@
message_uid
path
tag
+count
</params>
SELECT
- COUNT(DISTINCT uid) as uid_count
+<dtml-if count>
+ COUNT(*) AS uid_count
+<dtml-else>
+ *
+</dtml-if>
FROM
message_queue
WHERE
processing_node >= -2
<dtml-if method_id>
- AND (
-<dtml-in method_id>
- method_id = <dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else> OR </dtml-if>
-</dtml-in>
+ AND method_id IN (
+<dtml-in method_id><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
)
</dtml-if>
<dtml-if message_uid>AND uid = <dtml-sqlvar message_uid type="int"> </dtml-if>
<dtml-if path>
- AND (
-<dtml-in path>
- path = <dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else> OR </dtml-if>
-</dtml-in>
+ AND path IN (
+<dtml-in path><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
)
</dtml-if>
<dtml-if tag>
- AND (
-<dtml-in tag>
- tag = <dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else> OR </dtml-if>
-</dtml-in>
+ AND tag IN (
+<dtml-in tag><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
)
</dtml-if>
Modified: erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_writeMessage.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_writeMessage.zsql?rev=14039&r1=14038&r2=14039&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_writeMessage.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLQueue_writeMessage.zsql Wed Apr 11 03:21:56 2007
@@ -13,18 +13,20 @@
message
priority
broadcast
-processing_node=-1
+processing_node
date
tag</params>
INSERT INTO message_queue
SET
uid = <dtml-sqlvar uid type="int">,
- path = <dtml-sqlvar path type="string">,
+ path = <dtml-sqlvar path type="string">,
<dtml-if date>date = <dtml-sqlvar date type="datetime">, <dtml-else>date = <dtml-sqlvar "_.DateTime()" type="datetime">, </dtml-if>
- method_id = <dtml-sqlvar method_id type="string">,
- processing_node = <dtml-sqlvar processing_node type="int">,
- broadcast = <dtml-sqlvar broadcast type="int">,
- processing = -1,
- priority = <dtml-sqlvar priority type="int">,
- tag = <dtml-sqlvar tag type="string">,
- message = <dtml-sqlvar message type="string">
+ method_id = <dtml-sqlvar method_id type="string">,
+<dtml-if processing_node>
+ processing_node = <dtml-sqlvar processing_node type="int">,
+</dtml-if>
+ broadcast = <dtml-sqlvar broadcast type="int">,
+ processing = -1,
+ priority = <dtml-sqlvar priority type="int">,
+ tag = <dtml-sqlvar tag type="string">,
+ message = <dtml-sqlvar message type="string">
More information about the Erp5-report
mailing list