[Erp5-report] r35687 jm - in /erp5/trunk/products: ERP5/tests/ ERP5Type/tests/
nobody at svn.erp5.org
nobody at svn.erp5.org
Thu May 27 17:03:01 CEST 2010
Author: jm
Date: Thu May 27 17:02:55 2010
New Revision: 35687
URL: http://svn.erp5.org?rev=35687&view=rev
Log:
Add unit test to check that invalidations sent by ZEO are processed in time
Modified:
erp5/trunk/products/ERP5/tests/testInvalidationBug.py
erp5/trunk/products/ERP5Type/tests/ProcessingNodeTestCase.py
Modified: erp5/trunk/products/ERP5/tests/testInvalidationBug.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5/tests/testInvalidationBug.py?rev=35687&r1=35686&r2=35687&view=diff
==============================================================================
--- erp5/trunk/products/ERP5/tests/testInvalidationBug.py [utf8] (original)
+++ erp5/trunk/products/ERP5/tests/testInvalidationBug.py [utf8] Thu May 27 17:02:55 2010
@@ -28,15 +28,16 @@
#
##############################################################################
+import os
+import threading
+import time
import unittest
-import os
-
+import urllib
import transaction
-
+from DateTime import DateTime
from Testing import ZopeTestCase
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.utils import createZODBPythonScript
-from DateTime import DateTime
class TestInvalidationBug(ERP5TypeTestCase):
@@ -91,6 +92,81 @@
self.assertEqual(result_list[-2], [1,0]) # catalog
self.assertEqual(result_list[-1], [1,1]) # activity tables last
+ def testLateInvalidationFromZEO(self):
+ ### Check unit test is run properly
+ from ZEO.ClientStorage import ClientStorage
+ storage = self.portal._p_jar._storage
+ activity_tool = self.portal.portal_activities
+ node_list = list(activity_tool.getProcessingNodeList())
+ node_list.remove(activity_tool.getCurrentNode())
+ assert node_list and isinstance(storage, ClientStorage), \
+ "this unit test must be run with at least 2 ZEO clients"
+
+ ### Prepare unit test, to minimize amount of work during critical section
+ ## url to create some content using another zope
+ new_content_url = "http://ERP5TypeTestCase:@%s%s/newContent" % (
+ node_list[0], self.portal.organisation_module.getPath())
+ ## prepare freeze/unfreeze of ZEO storage
+ from asyncore import socket_map
+ zeo_connection = storage._connection
+ freeze_lock = threading.Lock()
+ freeze_lock.acquire()
+ def unfreezeStorage():
+ socket_map[zeo_connection.fileno()] = zeo_connection
+ # wake up asyncore loop to take the new socket into account
+ zeo_connection._pull_trigger()
+ # link to ZEO will be unfrozen 1 second after we read 'message' table
+ unfreeze_timer = threading.Timer(1, unfreezeStorage)
+ unfreeze_timer.setDaemon(True)
+ ## prepare monkey-patches (with code to revert them)
+ from Products.CMFActivity.Activity.SQLDict import SQLDict
+ zeo_server = storage._server
+ def unpatch():
+ storage._server = zeo_server
+ SQLDict.getProcessableMessageList = SQLDict_getProcessableMessageList
+ SQLDict_getProcessableMessageList = SQLDict.getProcessableMessageList
+ def getProcessableMessageList(*args, **kw):
+ result = SQLDict_getProcessableMessageList(*args, **kw)
+ unpatch()
+ unfreeze_timer.start()
+ return result
+
+ ### Perform unit test
+ ## we should start without any pending activity
+ self.assertNoPendingMessage()
+ ## monkey-patch ...
+ SQLDict.getProcessableMessageList = getProcessableMessageList
+ try:
+ # prevent nodes from processing activities automatically
+ activity_tool.manage_removeFromProcessingList(node_list)
+ transaction.commit()
+ del socket_map[zeo_connection.fileno()]
+ try:
+ # wake up asyncore loop and wait we really woke up
+ zeo_connection.trigger.pull_trigger(freeze_lock.release)
+ freeze_lock.acquire()
+ # make sure ZODB is not accessed until we get a message to process
+ storage._server = None
+ # ... monkey-patch done
+ ## create object
+ urllib.urlopen(new_content_url).read()
+ ## validate reindex activity
+ activity_tool.distribute()
+ self.assertEqual(1, len(activity_tool.getMessageList()))
+ ## reindex created object
+ activity_tool.tic()
+ finally:
+ try:
+ unfreeze_timer.join()
+ except RuntimeError:
+ unfreezeStorage()
+ finally:
+ unpatch()
+ activity_tool.manage_addToProcessingList(node_list)
+ transaction.commit()
+ ## When the bug is not fixed, we get a -3 failed activity
+ self.assertNoPendingMessage()
+
def _testReindex(self):
print("To reproduce bugs easily, distribution step should be skipped for"
" SQLDict, by writing messages with processing_node already at 0."
Modified: erp5/trunk/products/ERP5Type/tests/ProcessingNodeTestCase.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5Type/tests/ProcessingNodeTestCase.py?rev=35687&r1=35686&r2=35687&view=diff
==============================================================================
--- erp5/trunk/products/ERP5Type/tests/ProcessingNodeTestCase.py [utf8] (original)
+++ erp5/trunk/products/ERP5Type/tests/ProcessingNodeTestCase.py [utf8] Thu May 27 17:02:55 2010
@@ -113,6 +113,20 @@
else:
activity_tool.manage_removeFromProcessingList((currentNode,))
+ def assertNoPendingMessage(self):
+ """Get the last error message from error_log"""
+ message_list = self.portal.portal_activities.getMessageList()
+ if message_list:
+ error_message = 'These messages are pending: %r' % [
+ ('/'.join(m.object_path), m.method_id, m.processing_node, m.retry)
+ for m in message_list]
+ error_log = self.portal.error_log._getLog()
+ if len(error_log):
+ error_message += '\nLast error message:' \
+ '\n%(type)s\n%(value)s\n%(tb_text)s' \
+ % error_log[-1]
+ raise self.fail(error_message)
+
def tic(self, verbose=0):
"""Execute pending activities"""
portal_activities = self.getPortal().portal_activities
@@ -136,22 +150,12 @@
# This prevents an infinite loop.
count -= 1
if count == 0:
- # Get the last error message from error_log.
- error_message = ''
- error_log = self.portal.error_log._getLog()
- if len(error_log):
- last_log = error_log[-1]
- error_message = '\nLast error message:\n%s\n%s\n%s\n' % (
- last_log['type'],
- last_log['value'],
- last_log['tb_text'],
- )
- raise RuntimeError,\
- 'tic is looping forever. These messages are pending: %r %s' % (
- [('/'.join(m.object_path), m.method_id, m.processing_node, m.retry)
- for m in portal_activities.getMessageList()],
- error_message
- )
+ error_message = 'tic is looping forever. '
+ try:
+ self.assertNoPendingMessage()
+ except AssertionError, e:
+ error_message += str(e)
+ raise RuntimeError(error_message)
# This give some time between messages
if count % 10 == 0:
portal_activities.timeShift(3 * VALIDATION_ERROR_DELAY)
More information about the Erp5-report
mailing list