[Erp5-report] r8156 - in /erp5/trunk/products/CMFActivity/Activity: SQLDict.py SQLQueue.py
nobody at svn.erp5.org
nobody at svn.erp5.org
Fri Jun 23 13:58:32 CEST 2006
Author: yo
Date: Fri Jun 23 13:58:28 2006
New Revision: 8156
URL: http://svn.erp5.org?rev=8156&view=rev
Log:
Do not use hasattr, because hasattr drains exceptions.
Modified:
erp5/trunk/products/CMFActivity/Activity/SQLDict.py
erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
Modified: erp5/trunk/products/CMFActivity/Activity/SQLDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLDict.py?rev=8156&r1=8155&r2=8156&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLDict.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLDict.py Fri Jun 23 13:58:28 2006
@@ -103,7 +103,7 @@
group_method_id_list = group_method_id_list,
tag_list = tag_list,
order_validation_text_list = order_validation_text_list)
-
+
def prepareDeleteMessage(self, activity_tool, m):
# Erase all messages in a single transaction
path = '/'.join(m.object_path)
@@ -165,7 +165,7 @@
# string is false, so we must use a non-empty string for this.
return 'none'
return sha.new(repr(order_validation_item_list)).hexdigest()
-
+
def validateMessage(self, activity_tool, message, uid_list, priority, processing_node):
validation_state = message.validate(self, activity_tool)
if validation_state is not VALID:
@@ -195,159 +195,163 @@
get_transaction().commit() # Release locks before starting a potentially long calculation
return 0
return 1
-
+
# Queue semantic
def dequeueMessage(self, activity_tool, processing_node):
- if hasattr(activity_tool,'SQLDict_readMessage'):
- now_date = DateTime()
- priority = random.choice(priority_weight)
- # Try to find a message at given priority level which is scheduled for now
- result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority,
- to_date=now_date)
- if len(result) == 0:
- # If empty, take any message which is scheduled for now
- priority = None
- result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority, to_date=now_date)
- if len(result) == 0:
- # If the result is still empty, shift the dates so that SQLDict can dispatch pending active
- # objects quickly.
- self.timeShift(activity_tool, VALIDATION_ERROR_DELAY, processing_node,retry=1)
- elif len(result) > 0:
- #LOG('SQLDict dequeueMessage', 100, 'result = %r' % (list(result)))
- line = result[0]
- path = line.path
- method_id = line.method_id
- order_validation_text = line.order_validation_text
- uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
- processing_node = None, to_date = now_date,
- order_validation_text = order_validation_text)
- uid_list = [x.uid for x in uid_list]
- uid_list_list = [uid_list]
- priority_list = [line.priority]
- # Make sure message can not be processed anylonger
- if len(uid_list) > 0:
- # Set selected messages to processing
- activity_tool.SQLDict_processMessage(uid = uid_list)
- get_transaction().commit() # Release locks before starting a potentially long calculation
- # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state
- m = self.loadMessage(line.message, uid = line.uid)
- message_list = [m]
- # Validate message (make sure object exists, priority OK, etc.)
- if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
- group_method_id = m.activity_kw.get('group_method_id')
- if group_method_id is not None:
- # Count the number of objects to prevent too many objects.
- if m.hasExpandMethod():
- try:
- count = len(m.getObjectList(activity_tool))
- except:
- # Here, simply ignore an exception. The same exception should be handled later.
- LOG('SQLDict', 0, 'ignoring an exception from getObjectList', error=sys.exc_info())
- count = 0
+ readMessage = getattr(activity_tool, 'SQLDict_readMessage', None)
+ if readMessage is None:
+ return 1
+
+ now_date = DateTime()
+ priority = random.choice(priority_weight)
+ # Try to find a message at given priority level which is scheduled for now
+ result = readMessage(processing_node=processing_node, priority=priority,
+ to_date=now_date)
+ if len(result) == 0:
+ # If empty, take any message which is scheduled for now
+ priority = None
+ result = readMessage(processing_node=processing_node, priority=priority, to_date=now_date)
+ if len(result) == 0:
+ # If the result is still empty, shift the dates so that SQLDict can dispatch pending active
+ # objects quickly.
+ self.timeShift(activity_tool, VALIDATION_ERROR_DELAY, processing_node,retry=1)
+ elif len(result) > 0:
+ #LOG('SQLDict dequeueMessage', 100, 'result = %r' % (list(result)))
+ line = result[0]
+ path = line.path
+ method_id = line.method_id
+ order_validation_text = line.order_validation_text
+ uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
+ processing_node = None, to_date = now_date,
+ order_validation_text = order_validation_text)
+ uid_list = [x.uid for x in uid_list]
+ uid_list_list = [uid_list]
+ priority_list = [line.priority]
+ # Make sure message can not be processed anylonger
+ if len(uid_list) > 0:
+ # Set selected messages to processing
+ activity_tool.SQLDict_processMessage(uid = uid_list)
+ get_transaction().commit() # Release locks before starting a potentially long calculation
+ # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state
+ m = self.loadMessage(line.message, uid = line.uid)
+ message_list = [m]
+ # Validate message (make sure object exists, priority OK, etc.)
+ if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
+ group_method_id = m.activity_kw.get('group_method_id')
+ if group_method_id is not None:
+ # Count the number of objects to prevent too many objects.
+ if m.hasExpandMethod():
+ try:
+ count = len(m.getObjectList(activity_tool))
+ except:
+ # Here, simply ignore an exception. The same exception should be handled later.
+ LOG('SQLDict', 0, 'ignoring an exception from getObjectList', error=sys.exc_info())
+ count = 0
+ else:
+ count = 1
+
+ group_method = activity_tool.restrictedTraverse(group_method_id)
+
+ if count < MAX_GROUPED_OBJECTS:
+ # Retrieve objects which have the same group method.
+ result = readMessage(processing_node = processing_node, priority = priority,
+ to_date = now_date, group_method_id = group_method_id,
+ order_validation_text = order_validation_text)
+ #LOG('SQLDict dequeueMessage', 0, 'result = %d' % (len(result)))
+ for line in result:
+ path = line.path
+ method_id = line.method_id
+ uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
+ processing_node = None, to_date = now_date,
+ order_validation_text = order_validation_text)
+ uid_list = [x.uid for x in uid_list]
+ if len(uid_list) > 0:
+ # Set selected messages to processing
+ activity_tool.SQLDict_processMessage(uid = uid_list)
+ get_transaction().commit() # Release locks before starting a potentially long calculation
+ m = self.loadMessage(line.message, uid = line.uid)
+ if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
+ if m.hasExpandMethod():
+ try:
+ count += len(m.getObjectList(activity_tool))
+ except:
+ # Here, simply ignore an exception. The same exception should be handled later.
+ LOG('SQLDict', 0, 'ignoring an exception from getObjectList', error=sys.exc_info())
+ pass
+ else:
+ count += 1
+ message_list.append(m)
+ uid_list_list.append(uid_list)
+ priority_list.append(line.priority)
+ if count >= MAX_GROUPED_OBJECTS:
+ break
+
+ # Release locks before starting a potentially long calculation
+ get_transaction().commit()
+ # Try to invoke
+ if group_method_id is not None:
+ LOG('SQLDict', TRACE,
+ 'invoking a group method %s with %d objects '\
+ ' (%d objects in expanded form)' % (
+ group_method_id, len(message_list), count))
+ activity_tool.invokeGroup(group_method_id, message_list)
+ else:
+ activity_tool.invoke(message_list[0])
+
+ # Check if messages are executed successfully.
+ # When some of them are executed successfully, it may not be acceptable to
+ # abort the transaction, because these remain pending, only due to other
+ # invalid messages. This means that a group method should not be used if
+ # it has a side effect. For now, only indexing uses a group method, and this
+ # has no side effect.
+ for m in message_list:
+ if m.is_executed:
+ break
+ else:
+ get_transaction().abort()
+
+ for i in xrange(len(message_list)):
+ m = message_list[i]
+ uid_list = uid_list_list[i]
+ priority = priority_list[i]
+ if m.is_executed:
+ activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it
+ get_transaction().commit() # If successful, commit
+ if m.active_process:
+ active_process = activity_tool.unrestrictedTraverse(m.active_process)
+ if not active_process.hasActivity():
+ # No more activity
+ m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
+ else:
+ if type(m.exc_type) is ClassType and issubclass(m.exc_type, ConflictError):
+ # If this is a conflict error, do not lower the priority but only delay.
+ activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
+ retry = 1)
+ get_transaction().commit() # Release locks before starting a potentially long calculation
+ elif priority > MAX_PRIORITY:
+ # This is an error
+ if len(uid_list) > 0:
+ activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE)
+ # Assign message back to 'error' state
+ m.notifyUser(activity_tool) # Notify Error
+ get_transaction().commit() # and commit
else:
- count = 1
-
- group_method = activity_tool.restrictedTraverse(group_method_id)
-
- if count < MAX_GROUPED_OBJECTS:
- # Retrieve objects which have the same group method.
- result = activity_tool.SQLDict_readMessage(processing_node = processing_node, priority = priority,
- to_date = now_date, group_method_id = group_method_id,
- order_validation_text = order_validation_text)
- #LOG('SQLDict dequeueMessage', 0, 'result = %d' % (len(result)))
- for line in result:
- path = line.path
- method_id = line.method_id
- uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
- processing_node = None, to_date = now_date,
- order_validation_text = order_validation_text)
- uid_list = [x.uid for x in uid_list]
- if len(uid_list) > 0:
- # Set selected messages to processing
- activity_tool.SQLDict_processMessage(uid = uid_list)
+ # Lower priority
+ if len(uid_list) > 0:
+ activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
+ priority = priority + 1, retry = 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
- m = self.loadMessage(line.message, uid = line.uid)
- if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
- if m.hasExpandMethod():
- try:
- count += len(m.getObjectList(activity_tool))
- except:
- # Here, simply ignore an exception. The same exception should be handled later.
- LOG('SQLDict', 0, 'ignoring an exception from getObjectList', error=sys.exc_info())
- pass
- else:
- count += 1
- message_list.append(m)
- uid_list_list.append(uid_list)
- priority_list.append(line.priority)
- if count >= MAX_GROUPED_OBJECTS:
- break
-
- # Release locks before starting a potentially long calculation
- get_transaction().commit()
- # Try to invoke
- if group_method_id is not None:
- LOG('SQLDict', TRACE,
- 'invoking a group method %s with %d objects '\
- ' (%d objects in expanded form)' % (
- group_method_id, len(message_list), count))
- activity_tool.invokeGroup(group_method_id, message_list)
- else:
- activity_tool.invoke(message_list[0])
-
- # Check if messages are executed successfully.
- # When some of them are executed successfully, it may not be acceptable to
- # abort the transaction, because these remain pending, only due to other
- # invalid messages. This means that a group method should not be used if
- # it has a side effect. For now, only indexing uses a group method, and this
- # has no side effect.
- for m in message_list:
- if m.is_executed:
- break
- else:
- get_transaction().abort()
-
- for i in xrange(len(message_list)):
- m = message_list[i]
- uid_list = uid_list_list[i]
- priority = priority_list[i]
- if m.is_executed:
- activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it
- get_transaction().commit() # If successful, commit
- if m.active_process:
- active_process = activity_tool.unrestrictedTraverse(m.active_process)
- if not active_process.hasActivity():
- # No more activity
- m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
- else:
- if type(m.exc_type) is ClassType and issubclass(m.exc_type, ConflictError):
- # If this is a conflict error, do not lower the priority but only delay.
- activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
- retry = 1)
- get_transaction().commit() # Release locks before starting a potentially long calculation
- elif priority > MAX_PRIORITY:
- # This is an error
- if len(uid_list) > 0:
- activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE)
- # Assign message back to 'error' state
- m.notifyUser(activity_tool) # Notify Error
- get_transaction().commit() # and commit
- else:
- # Lower priority
- if len(uid_list) > 0:
- activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
- priority = priority + 1, retry = 1)
- get_transaction().commit() # Release locks before starting a potentially long calculation
-
- return 0
- get_transaction().commit() # Release locks before starting a potentially long calculation
+
+ return 0
+ get_transaction().commit() # Release locks before starting a potentially long calculation
return 1
def hasActivity(self, activity_tool, object, **kw):
- if hasattr(activity_tool,'SQLDict_readMessageList'):
+ hasMessage = getattr(activity_tool, 'SQLDict_hasMessage', None)
+ if hasMessage is not None:
if object is not None:
my_object_path = '/'.join(object.getPhysicalPath())
- result = activity_tool.SQLDict_hasMessage(path=my_object_path, **kw)
+ result = hasMessage(path=my_object_path, **kw)
if len(result) > 0:
return result[0].message_count > 0
else:
@@ -369,7 +373,8 @@
path = '/'.join(object_path)
# LOG('Flush', 0, str((path, invoke, method_id)))
method_dict = {}
- if hasattr(activity_tool,'SQLDict_readMessageList'):
+ readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
+ if readMessageList is not None:
# Parse each message in registered
for m in activity_tool.getRegisteredMessageList(self):
if list(m.object_path) == list(object_path) and (method_id is None or method_id == m.method_id):
@@ -391,8 +396,8 @@
raise ActivityFlushError, (
'The document %s does not exist' % path)
# Parse each message in SQL dict
- result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,
- processing_node=None,include_processing=0)
+ result = readMessageList(path=path, method_id=method_id,
+ processing_node=None,include_processing=0)
for line in result:
path = line.path
method_id = line.method_id
@@ -422,8 +427,10 @@
def getMessageList(self, activity_tool, processing_node=None,include_processing=0,**kw):
# YO: reading all lines might cause a deadlock
message_list = []
- if hasattr(activity_tool,'SQLDict_readMessageList'):
- result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node=None, to_processing_date=None,include_processing=include_processing)
+ readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
+ if readMessageList is not None:
+ result = readMessageList(path=None, method_id=None, processing_node=None,
+ to_processing_date=None,include_processing=include_processing)
for line in result:
m = self.loadMessage(line.message, uid = line.uid)
m.processing_node = line.processing_node
@@ -435,8 +442,9 @@
def dumpMessageList(self, activity_tool):
# Dump all messages in the table.
message_list = []
- if hasattr(activity_tool, 'SQLDict_dumpMessageList'):
- result = activity_tool.SQLDict_dumpMessageList()
+ dumpMessageList = getattr(activity_tool, 'SQLDict_dumpMessageList', None)
+ if dumpMessageList is not None:
+ result = dumpMessageList()
for line in result:
m = self.loadMessage(line.message, uid = line.uid)
message_list.append(m)
@@ -444,7 +452,8 @@
def distribute(self, activity_tool, node_count):
processing_node = 1
- if hasattr(activity_tool,'SQLDict_readMessageList'):
+ readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
+ if readMessageList is not None:
now_date = DateTime()
if (now_date - self.max_processing_date) > MAX_PROCESSING_TIME:
# Sticky processing messages should be set back to non processing
@@ -452,9 +461,9 @@
self.max_processing_date = now_date
else:
max_processing_date = None
- result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node = -1,
- to_processing_date = max_processing_date,
- include_processing=0) # Only assign non assigned messages
+ result = readMessageList(path=None, method_id=None, processing_node = -1,
+ to_processing_date = max_processing_date,
+ include_processing=0) # Only assign non assigned messages
get_transaction().commit() # Release locks before starting a potentially long calculation
path_dict = {}
for line in result:
@@ -534,7 +543,7 @@
if result[0].uid_count > 0:
return INVALID_ORDER
return VALID
-
+
def _validate_after_tag_and_method_id(self, activity_tool, message, value):
# Count number of occurances of tag and method_id
if (type(value) != TupleType and type(value) != ListType) or len(value)<2:
Modified: erp5/trunk/products/CMFActivity/Activity/SQLQueue.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLQueue.py?rev=8156&r1=8155&r2=8156&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLQueue.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLQueue.py Fri Jun 23 13:58:28 2006
@@ -74,72 +74,76 @@
activity_tool.SQLQueue_delMessage(uid = m.uid)
def dequeueMessage(self, activity_tool, processing_node):
- if hasattr(activity_tool,'SQLQueue_readMessageList'):
- now_date = DateTime()
- # Next processing date in case of error
- next_processing_date = now_date + float(VALIDATION_ERROR_DELAY)/86400
- priority = random.choice(priority_weight)
- # Try to find a message at given priority level
- result = activity_tool.SQLQueue_readMessage(processing_node=processing_node, priority=priority,
- to_date=now_date)
- if len(result) == 0:
- # If empty, take any message
- result = activity_tool.SQLQueue_readMessage(processing_node=processing_node, priority=None,to_date=now_date)
- if len(result) > 0:
- line = result[0]
- path = line.path
- method_id = line.method_id
- # Make sure message can not be processed anylonger
- activity_tool.SQLQueue_processMessage(uid=line.uid)
- get_transaction().commit() # Release locks before starting a potentially long calculation
- m = self.loadMessage(line.message)
- # Make sure object exists
- validation_state = m.validate(self, activity_tool)
- if validation_state is not VALID:
- if validation_state in (EXCEPTION, INVALID_PATH):
- if line.priority > MAX_PRIORITY:
- # This is an error
- activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = VALIDATE_ERROR_STATE)
- # Assign message back to 'error' state
- #m.notifyUser(activity_tool) # Notify Error
- get_transaction().commit() # and commit
- else:
- # Lower priority
- activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1)
- get_transaction().commit() # Release locks before starting a potentially long calculation
+ readMessage = getattr(activity_tool, 'SQLQueue_readMessage', None)
+ if readMessage is None:
+ return 1
+
+ now_date = DateTime()
+ # Next processing date in case of error
+ next_processing_date = now_date + float(VALIDATION_ERROR_DELAY)/86400
+ priority = random.choice(priority_weight)
+ # Try to find a message at given priority level
+ result = readMessage(processing_node=processing_node, priority=priority,
+ to_date=now_date)
+ if len(result) == 0:
+ # If empty, take any message
+ result = readMessage(processing_node=processing_node, priority=None,to_date=now_date)
+ if len(result) > 0:
+ line = result[0]
+ path = line.path
+ method_id = line.method_id
+ # Make sure message can not be processed anylonger
+ activity_tool.SQLQueue_processMessage(uid=line.uid)
+ get_transaction().commit() # Release locks before starting a potentially long calculation
+ m = self.loadMessage(line.message)
+ # Make sure object exists
+ validation_state = m.validate(self, activity_tool)
+ if validation_state is not VALID:
+ if validation_state in (EXCEPTION, INVALID_PATH):
+ if line.priority > MAX_PRIORITY:
+ # This is an error
+ activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = VALIDATE_ERROR_STATE)
+ # Assign message back to 'error' state
+ #m.notifyUser(activity_tool) # Notify Error
+ get_transaction().commit() # and commit
else:
- # We do not lower priority for INVALID_ORDER errors but we do postpone execution
- activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date,
- priority = line.priority)
+ # Lower priority
+ activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
else:
- # Try to invoke
- activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ?
- if m.is_executed: # Make sure message could be invoked
- activity_tool.SQLQueue_delMessage(uid=line.uid) # Delete it
- get_transaction().commit() # If successful, commit
+ # We do not lower priority for INVALID_ORDER errors but we do postpone execution
+ activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date,
+ priority = line.priority)
+ get_transaction().commit() # Release locks before starting a potentially long calculation
+ else:
+ # Try to invoke
+ activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ?
+ if m.is_executed: # Make sure message could be invoked
+ activity_tool.SQLQueue_delMessage(uid=line.uid) # Delete it
+ get_transaction().commit() # If successful, commit
+ else:
+ get_transaction().abort() # If not, abort transaction and start a new one
+ if line.priority > MAX_PRIORITY:
+ # This is an error
+ activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = INVOKE_ERROR_STATE)
+ # Assign message back to 'error' state
+ m.notifyUser(activity_tool) # Notify Error
+ get_transaction().commit() # and commit
else:
- get_transaction().abort() # If not, abort transaction and start a new one
- if line.priority > MAX_PRIORITY:
- # This is an error
- activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = INVOKE_ERROR_STATE)
- # Assign message back to 'error' state
- m.notifyUser(activity_tool) # Notify Error
- get_transaction().commit() # and commit
- else:
- # Lower priority
- activity_tool.SQLQueue_setPriority(uid=line.uid, date = next_processing_date,
- priority = line.priority + 1)
- get_transaction().commit() # Release locks before starting a potentially long calculation
- return 0
- get_transaction().commit() # Release locks before starting a potentially long calculation
+ # Lower priority
+ activity_tool.SQLQueue_setPriority(uid=line.uid, date = next_processing_date,
+ priority = line.priority + 1)
+ get_transaction().commit() # Release locks before starting a potentially long calculation
+ return 0
+ get_transaction().commit() # Release locks before starting a potentially long calculation
return 1
def hasActivity(self, activity_tool, object, **kw):
- if hasattr(activity_tool,'SQLQueue_readMessageList'):
+ hasMessage = getattr(activity_tool, 'SQLQueue_hasMessage', None)
+ if hasMessage is not None:
if object is not None:
my_object_path = '/'.join(object.getPhysicalPath())
- result = activity_tool.SQLQueue_hasMessage(path=my_object_path, **kw)
+ result = hasMessage(path=my_object_path, **kw)
if len(result) > 0:
return result[0].message_count > 0
else:
@@ -158,7 +162,8 @@
NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible
"""
- if hasattr(activity_tool,'SQLQueue_readMessageList'):
+ readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None)
+ if readMessageList is not None:
#return # Do nothing here to precent overlocking
path = '/'.join(object_path)
# Parse each message in registered
@@ -168,7 +173,7 @@
activity_tool.unregisterMessage(self, m)
# Parse each message in SQL queue
#LOG('Flush', 0, str((path, invoke, method_id)))
- result = activity_tool.SQLQueue_readMessageList(path=path, method_id=method_id,processing_node=None)
+ result = readMessageList(path=path, method_id=method_id,processing_node=None)
#LOG('Flush', 0, str(len(result)))
method_dict = {}
for line in result:
@@ -202,8 +207,9 @@
def getMessageList(self, activity_tool, processing_node=None,**kw):
message_list = []
- if hasattr(activity_tool,'SQLQueue_readMessageList'):
- result = activity_tool.SQLQueue_readMessageList(path=None, method_id=None, processing_node=None)
+ readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None)
+ if readMessageList is not None:
+ result = readMessageList(path=None, method_id=None, processing_node=None)
for line in result:
m = self.loadMessage(line.message)
m.processing_node = line.processing_node
@@ -214,17 +220,19 @@
def dumpMessageList(self, activity_tool):
# Dump all messages in the table.
message_list = []
- if hasattr(activity_tool, 'SQLQueue_dumpMessageList'):
- result = activity_tool.SQLQueue_dumpMessageList()
+ dumpMessageList = getattr(activity_tool, 'SQLQueue_dumpMessageList', None)
+ if dumpMessageList is not None:
+ result = dumpMessageList()
for line in result:
m = self.loadMessage(line.message, uid = line.uid)
message_list.append(m)
return message_list
-
+
def distribute(self, activity_tool, node_count):
processing_node = 1
- if hasattr(activity_tool,'SQLQueue_readMessageList'):
- result = activity_tool.SQLQueue_readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages
+ readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None)
+ if readMessageList is not None:
+ result = readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages
#LOG('distribute count',0,str(len(result)) )
#LOG('distribute count',0,str(map(lambda x:x.uid, result)))
#get_transaction().commit() # Release locks before starting a potentially long calculation
@@ -296,7 +304,7 @@
if result[0].uid_count > 0:
return INVALID_ORDER
return VALID
-
+
def _validate_after_tag(self, activity_tool, message, value):
# Count number of occurances of tag
if type(value) == type(''):
@@ -305,7 +313,7 @@
if result[0].uid_count > 0:
return INVALID_ORDER
return VALID
-
+
def _validate_after_tag_and_method_id(self, activity_tool, message, value):
# Count number of occurances of tag and method_id
if (type(value) != type ( (0,) ) and type(value) != type([])) or len(value)<2:
@@ -321,7 +329,7 @@
if result[0].uid_count > 0:
return INVALID_ORDER
return VALID
-
+
# Required for tests (time shift)
def timeShift(self, activity_tool, delay, processing_node = None):
"""
More information about the Erp5-report
mailing list