[Erp5-report] r35687 jm - in /erp5/trunk/products: ERP5/tests/ ERP5Type/tests/

nobody at svn.erp5.org nobody at svn.erp5.org
Thu May 27 17:03:01 CEST 2010


Author: jm
Date: Thu May 27 17:02:55 2010
New Revision: 35687

URL: http://svn.erp5.org?rev=35687&view=rev
Log:
Add unit test to check that invalidations sent by ZEO are processed in time

Modified:
    erp5/trunk/products/ERP5/tests/testInvalidationBug.py
    erp5/trunk/products/ERP5Type/tests/ProcessingNodeTestCase.py

Modified: erp5/trunk/products/ERP5/tests/testInvalidationBug.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5/tests/testInvalidationBug.py?rev=35687&r1=35686&r2=35687&view=diff
==============================================================================
--- erp5/trunk/products/ERP5/tests/testInvalidationBug.py [utf8] (original)
+++ erp5/trunk/products/ERP5/tests/testInvalidationBug.py [utf8] Thu May 27 17:02:55 2010
@@ -28,15 +28,16 @@
 #
 ##############################################################################
 
+import os
+import threading
+import time
 import unittest
-import os
-
+import urllib
 import transaction
-
+from DateTime import DateTime
 from Testing import ZopeTestCase
 from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
 from Products.ERP5Type.tests.utils import createZODBPythonScript
-from DateTime import DateTime
 
 class TestInvalidationBug(ERP5TypeTestCase):
 
@@ -91,6 +92,81 @@
     self.assertEqual(result_list[-2], [1,0]) # catalog
     self.assertEqual(result_list[-1], [1,1]) # activity tables last
 
+  def testLateInvalidationFromZEO(self):
+    ### Check unit test is run properly
+    from ZEO.ClientStorage import ClientStorage
+    storage = self.portal._p_jar._storage
+    activity_tool = self.portal.portal_activities
+    node_list = list(activity_tool.getProcessingNodeList())
+    node_list.remove(activity_tool.getCurrentNode())
+    assert node_list and isinstance(storage, ClientStorage), \
+      "this unit test must be run with at least 2 ZEO clients"
+
+    ### Prepare unit test, to minimize amount of work during critical section
+    ## url to create some content using another zope
+    new_content_url = "http://ERP5TypeTestCase:@%s%s/newContent" % (
+      node_list[0], self.portal.organisation_module.getPath())
+    ## prepare freeze/unfreeze of ZEO storage
+    from asyncore import socket_map
+    zeo_connection = storage._connection
+    freeze_lock = threading.Lock()
+    freeze_lock.acquire()
+    def unfreezeStorage():
+      socket_map[zeo_connection.fileno()] = zeo_connection
+      # wake up asyncore loop to take the new socket into account
+      zeo_connection._pull_trigger()
+    # link to ZEO will be unfrozen 1 second after we read 'message' table
+    unfreeze_timer = threading.Timer(1, unfreezeStorage)
+    unfreeze_timer.setDaemon(True)
+    ## prepare monkey-patches (with code to revert them)
+    from Products.CMFActivity.Activity.SQLDict import SQLDict
+    zeo_server = storage._server
+    def unpatch():
+      storage._server = zeo_server
+      SQLDict.getProcessableMessageList = SQLDict_getProcessableMessageList
+    SQLDict_getProcessableMessageList = SQLDict.getProcessableMessageList
+    def getProcessableMessageList(*args, **kw):
+      result = SQLDict_getProcessableMessageList(*args, **kw)
+      unpatch()
+      unfreeze_timer.start()
+      return result
+
+    ### Perform unit test
+    ## we should start without any pending activity
+    self.assertNoPendingMessage()
+    ## monkey-patch ...
+    SQLDict.getProcessableMessageList = getProcessableMessageList
+    try:
+      # prevent nodes from processing activities automatically
+      activity_tool.manage_removeFromProcessingList(node_list)
+      transaction.commit()
+      del socket_map[zeo_connection.fileno()]
+      try:
+        # wake up asyncore loop and wait we really woke up
+        zeo_connection.trigger.pull_trigger(freeze_lock.release)
+        freeze_lock.acquire()
+        # make sure ZODB is not accessed until we get a message to process
+        storage._server = None
+        # ... monkey-patch done
+        ## create object
+        urllib.urlopen(new_content_url).read()
+        ## validate reindex activity
+        activity_tool.distribute()
+        self.assertEqual(1, len(activity_tool.getMessageList()))
+        ## reindex created object
+        activity_tool.tic()
+      finally:
+        try:
+          unfreeze_timer.join()
+        except RuntimeError:
+          unfreezeStorage()
+    finally:
+      unpatch()
+      activity_tool.manage_addToProcessingList(node_list)
+      transaction.commit()
+    ## When the bug is not fixed, we get a -3 failed activity
+    self.assertNoPendingMessage()
+
   def _testReindex(self):
     print("To reproduce bugs easily, distribution step should be skipped for"
           " SQLDict, by writing messages with processing_node already at 0."

Modified: erp5/trunk/products/ERP5Type/tests/ProcessingNodeTestCase.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5Type/tests/ProcessingNodeTestCase.py?rev=35687&r1=35686&r2=35687&view=diff
==============================================================================
--- erp5/trunk/products/ERP5Type/tests/ProcessingNodeTestCase.py [utf8] (original)
+++ erp5/trunk/products/ERP5Type/tests/ProcessingNodeTestCase.py [utf8] Thu May 27 17:02:55 2010
@@ -113,6 +113,20 @@
     else:
       activity_tool.manage_removeFromProcessingList((currentNode,))
 
+  def assertNoPendingMessage(self):
+    """Get the last error message from error_log"""
+    message_list = self.portal.portal_activities.getMessageList()
+    if message_list:
+      error_message = 'These messages are pending: %r' % [
+          ('/'.join(m.object_path), m.method_id, m.processing_node, m.retry)
+          for m in message_list]
+      error_log = self.portal.error_log._getLog()
+      if len(error_log):
+        error_message += '\nLast error message:' \
+                         '\n%(type)s\n%(value)s\n%(tb_text)s' \
+                         % error_log[-1]
+      raise self.fail(error_message)
+
   def tic(self, verbose=0):
     """Execute pending activities"""
     portal_activities = self.getPortal().portal_activities
@@ -136,22 +150,12 @@
         # This prevents an infinite loop.
         count -= 1
         if count == 0:
-          # Get the last error message from error_log.
-          error_message = ''
-          error_log = self.portal.error_log._getLog()
-          if len(error_log):
-            last_log = error_log[-1]
-            error_message = '\nLast error message:\n%s\n%s\n%s\n' % (
-              last_log['type'],
-              last_log['value'],
-              last_log['tb_text'],
-              )
-          raise RuntimeError,\
-            'tic is looping forever. These messages are pending: %r %s' % (
-          [('/'.join(m.object_path), m.method_id, m.processing_node, m.retry)
-          for m in portal_activities.getMessageList()],
-          error_message
-          )
+          error_message = 'tic is looping forever. '
+          try:
+            self.assertNoPendingMessage()
+          except AssertionError, e:
+            error_message += str(e)
+          raise RuntimeError(error_message)
         # This give some time between messages
         if count % 10 == 0:
           portal_activities.timeShift(3 * VALIDATION_ERROR_DELAY)




More information about the Erp5-report mailing list