[Erp5-report] r19192 - in /erp5/trunk/products/CMFActivity: Activity/ skins/activity/
nobody at svn.erp5.org
nobody at svn.erp5.org
Fri Feb 8 18:28:50 CET 2008
Author: vincent
Date: Fri Feb 8 18:28:50 2008
New Revision: 19192
URL: http://svn.erp5.org?rev=19192&view=rev
Log:
Instead of deleting duplicate messages, reserve them, and delete them when the "original" message has succeeded.
Added:
erp5/trunk/products/CMFActivity/skins/activity/SQLDict_commit.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveDuplicatedLineList.zsql
erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectDuplicatedLineList.zsql
Removed:
erp5/trunk/products/CMFActivity/skins/activity/SQLDict_deleteDuplicatedMessageList.zsql
Modified:
erp5/trunk/products/CMFActivity/Activity/SQLDict.py
Modified: erp5/trunk/products/CMFActivity/Activity/SQLDict.py
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/Activity/SQLDict.py?rev=19192&r1=19191&r2=19192&view=diff
==============================================================================
--- erp5/trunk/products/CMFActivity/Activity/SQLDict.py (original)
+++ erp5/trunk/products/CMFActivity/Activity/SQLDict.py Fri Feb 8 18:28:50 2008
@@ -178,20 +178,35 @@
if len(uid_list):
activity_tool.SQLDict_makeMessageListAvailable(uid_list=uid_list)
- def deleteDuplicatedLineList(self, activity_tool, date, processing_node, line):
- """
- Delete all messages matching given one except itself.
- Operator Value
- != uid
- <= date
- = path, method_id, group_method_id, order_validation_text,
- processing_node, tag
- """
- activity_tool.SQLDict_deleteDuplicatedMessageList(
- processing_node=processing_node, uid=line.uid,
- path=line.path, method_id=line.method_id,
- group_method_id=line.group_method_id,
- order_validation_text=line.order_validation_text)
+ def getDuplicateMessageUidList(self, activity_tool, line, processing_node):
+ """
+ Reserve unreserved messages matching given line.
+ Return their uids.
+ """
+ try:
+ result = activity_tool.SQLDict_selectDuplicatedLineList(
+ path=line.path,
+ method_id=line.method_id,
+ group_method_id=line.group_method_id,
+ order_validation_text=line.order_validation_text
+ )
+ uid_list = [x.uid for x in result]
+ if len(uid_list):
+ activity_tool.SQLDict_reserveDuplicatedLineList(
+ processing_node=processing_node,
+ uid_list=uid_list
+ )
+ else:
+ # Release locks
+ activity_tool.SQLDict_commit()
+ except:
+ # Log
+ LOG('SQLDict', WARNING, 'getDuplicateMessageUidList got an exception', error=sys.exc_info())
+ # Release lock
+ activity_tool.SQLDict_rollback()
+ # And re-raise
+ raise
+ return uid_list
def getProcessableMessageList(self, activity_tool, processing_node):
"""
@@ -217,13 +232,14 @@
unclean state.
Returned values:
- 3-tuple:
+ 4-tuple:
- list of 3-tuple:
- message uid
- message
- priority
- impacted object count
- group_method_id
+ - uid_to_duplicate_uid_list_dict
"""
def getReservedMessageList(**kw):
line_list = self.getReservedMessageList(activity_tool=activity_tool,
@@ -233,9 +249,12 @@
if len(line_list):
LOG('SQLDict', TRACE, 'Reserved messages: %r' % ([x.uid for x in line_list]))
return line_list
- def deleteDuplicatedLineList(line):
- self.deleteDuplicatedLineList(activity_tool=activity_tool, date=now_date,
- processing_node=processing_node, line=line)
+ def getDuplicateMessageUidList(line):
+ uid_list = self.getDuplicateMessageUidList(activity_tool=activity_tool,
+ line=line, processing_node=processing_node)
+ if len(uid_list):
+ LOG('SQLDict', TRACE, 'Reserved duplicate messages: %r' % (uid_list, ))
+ return uid_list
def makeMessageListAvailable(uid_list):
self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
BUNDLE_MESSAGE_COUNT = 100 # Arbitrary number
@@ -248,14 +267,14 @@
group_method_id = None
try:
result = getReservedMessageList(limit=1)
+ uid_to_duplicate_uid_list_dict = {}
if len(result) > 0:
line = result[0]
m = self.loadMessage(line.message, uid=line.uid)
append(line, m)
group_method_id = line.group_method_id
- # Delete all messages matching current one - except current one.
- deleteDuplicatedLineList(line)
activity_tool.SQLDict_processMessage(uid=[line.uid])
+ uid_to_duplicate_uid_list_dict.setdefault(line.uid, []).extend(getDuplicateMessageUidList(line))
if group_method_id not in (None, '', '\0'):
# Count the number of objects to prevent too many objects.
count += len(m.getObjectList(activity_tool))
@@ -272,11 +291,12 @@
# So what remains to be filtered on are path, method_id,
# order_validation_text, tag
key = (line.path, line.method_id, line.order_validation_text, line.tag)
- if key in path_and_method_id_dict:
- LOG('SQLDict', TRACE, 'Duplicate of message %r has been skipped (it should already be deleted anyway): %r' % (path_and_method_id_dict[key], line.uid))
+ original_uid = path_and_method_id_dict.get(key)
+ if original_uid is not None:
+ uid_to_duplicate_uid_list_dict.setdefault(original_uid, []).append(line.uid)
continue
path_and_method_id_dict[key] = line.uid
- deleteDuplicatedLineList(line)
+ uid_to_duplicate_uid_list_dict.setdefault(line.uid, []).extend(getDuplicateMessageUidList(line))
if count < MAX_GROUPED_OBJECTS:
m = self.loadMessage(line.message, uid=line.uid)
count += len(m.getObjectList(activity_tool))
@@ -286,7 +306,7 @@
activity_tool.SQLDict_processMessage(uid=[x[0] for x in message_list])
# Unreserve extra messages as soon as possible.
makeMessageListAvailable(unreserve_uid_list)
- return message_list, count, group_method_id
+ return message_list, count, group_method_id, uid_to_duplicate_uid_list_dict
except:
LOG('SQLDict', WARNING, 'Exception while reserving messages.', error=sys.exc_info())
if len(message_list):
@@ -300,9 +320,9 @@
LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
else:
LOG('SQLDict', TRACE, '(no message was reserved)')
- return [], 0, None
-
- def finalizeMessageExecution(self, activity_tool, message_uid_priority_list):
+ return [], 0, None, {}
+
+ def finalizeMessageExecution(self, activity_tool, message_uid_priority_list, uid_to_duplicate_uid_list_dict):
"""
If everything was fine, delete all messages.
If anything failed, make successfull messages available (if any), and
@@ -326,11 +346,18 @@
if m.is_executed:
if something_failed:
make_available_uid_list.append(uid)
+ make_available_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
else:
deletable_uid_list.append(uid)
+ deletable_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
if m.active_process:
+ # XXX: Bug here: Even if a duplicate message has an active_process,
+ # it won't be called on the duplicate.
message_with_active_process_list.append(m)
else:
+ # Whatever happens, duplicate uids are to be made available. Only
+ # executed message will get to lower priority or error state.
+ make_available_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
exc_type = m.exc_info[0]
if type(exc_type) is ClassType and \
issubclass(exc_type, ConflictError):
@@ -388,9 +415,14 @@
# Queue semantic
def dequeueMessage(self, activity_tool, processing_node):
- def makeMessageListAvailable(uid_list):
- self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
- message_uid_priority_list, count, group_method_id = \
+ def makeMessageListAvailable(uid_list, uid_to_duplicate_uid_list_dict):
+ final_uid_list = []
+ for uid in uid_list:
+ final_uid_list.append(uid)
+ final_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
+ self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=final_uid_list)
+ message_uid_priority_list, count, group_method_id, \
+ uid_to_duplicate_uid_list_dict = \
self.getProcessableMessageList(activity_tool, processing_node)
if len(message_uid_priority_list):
# Remove group_id parameter from group_method_id
@@ -425,7 +457,7 @@
raise
to_free_uid_list = [x[0] for x in message_uid_priority_list]
try:
- makeMessageListAvailable(to_free_uid_list)
+ makeMessageListAvailable(to_free_uid_list, uid_to_duplicate_uid_list_dict)
except:
LOG('SQLDict', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
else:
@@ -451,12 +483,12 @@
x[1].is_executed = 0
failed_message_uid_list = [x[0] for x in message_uid_priority_list]
try:
- makeMessageListAvailable(failed_message_uid_list)
+ makeMessageListAvailable(failed_message_uid_list, uid_to_duplicate_uid_list_dict)
except:
LOG('SQLDict', ERROR, 'Failed to free remaining messages: %r' % (failed_message_uid_list, ), error=sys.exc_info())
else:
LOG('SQLDict', TRACE, 'Freed messages %r' % (failed_message_uid_list, ))
- self.finalizeMessageExecution(activity_tool, message_uid_priority_list)
+ self.finalizeMessageExecution(activity_tool, message_uid_priority_list, uid_to_duplicate_uid_list_dict)
get_transaction().commit()
return not len(message_uid_priority_list)
Added: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_commit.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_commit.zsql?rev=19192&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_commit.zsql (added)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_commit.zsql Fri Feb 8 18:28:50 2008
@@ -1,0 +1,11 @@
+<dtml-comment>
+title:
+connection_id:cmf_activity_sql_connection
+max_rows:1000
+max_cache:0
+cache_time:0
+class_name:
+class_file:
+</dtml-comment>
+<params></params>
+COMMIT
Removed: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_deleteDuplicatedMessageList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_deleteDuplicatedMessageList.zsql?rev=19191&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_deleteDuplicatedMessageList.zsql (original)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_deleteDuplicatedMessageList.zsql (removed)
@@ -1,28 +1,0 @@
-<dtml-comment>
-title:
-connection_id:cmf_activity_sql_connection
-max_rows:0
-max_cache:0
-cache_time:0
-class_name:
-class_file:
-</dtml-comment>
-<params>
-processing_node
-uid
-path
-method_id
-group_method_id
-order_validation_text
-</params>
-DELETE FROM
- message
-WHERE
- processing_node IN (0, <dtml-sqlvar processing_node type="int">)
- AND uid != <dtml-sqlvar uid type="int">
- AND path = <dtml-sqlvar path type="string">
- AND method_id = <dtml-sqlvar method_id type="string">
- AND group_method_id = <dtml-sqlvar group_method_id type="string">
- AND order_validation_text = <dtml-sqlvar order_validation_text type="string">
-<dtml-var sql_delimiter>
-COMMIT
Added: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveDuplicatedLineList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveDuplicatedLineList.zsql?rev=19192&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveDuplicatedLineList.zsql (added)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_reserveDuplicatedLineList.zsql Fri Feb 8 18:28:50 2008
@@ -1,0 +1,21 @@
+<dtml-comment>
+title:
+connection_id:cmf_activity_sql_connection
+max_rows:0
+max_cache:0
+cache_time:0
+class_name:
+class_file:
+</dtml-comment>
+<params>
+processing_node
+uid_list
+</params>
+UPDATE
+ message
+SET
+ processing_node=<dtml-sqlvar processing_node type="int">
+WHERE
+ uid IN (<dtml-in uid_list><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>, </dtml-if></dtml-in>)
+<dtml-var sql_delimiter>
+COMMIT
Added: erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectDuplicatedLineList.zsql
URL: http://svn.erp5.org/erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectDuplicatedLineList.zsql?rev=19192&view=auto
==============================================================================
--- erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectDuplicatedLineList.zsql (added)
+++ erp5/trunk/products/CMFActivity/skins/activity/SQLDict_selectDuplicatedLineList.zsql Fri Feb 8 18:28:50 2008
@@ -1,0 +1,24 @@
+<dtml-comment>
+title:
+connection_id:cmf_activity_sql_connection
+max_rows:0
+max_cache:0
+cache_time:0
+class_name:
+class_
+</dtml-comment>
+<params>
+path
+method_id
+group_method_id
+order_validation_text
+</params>
+SELECT uid FROM
+ message
+WHERE
+ processing_node = 0
+ AND path = <dtml-sqlvar path type="string">
+ AND method_id = <dtml-sqlvar method_id type="string">
+ AND group_method_id = <dtml-sqlvar group_method_id type="string">
+ AND order_validation_text = <dtml-sqlvar order_validation_text type="string">
+FOR UPDATE
More information about the Erp5-report
mailing list