[Erp5-report] r45694 jm - /erp5/trunk/products/ERP5Type/tests/
nobody at svn.erp5.org
nobody at svn.erp5.org
Wed Apr 27 17:05:43 CEST 2011
Author: jm
Date: Wed Apr 27 17:05:43 2011
New Revision: 45694
URL: http://svn.erp5.org?rev=45694&view=rev
Log:
Review setup of activity load balancing in ZEO-based unit tests
Do not initialize '/test_processing_nodes' from ZEO server anymore, because this
would not work for NEO. As a result, conflict resolution on Application is
implemented and cleanup of list of activity nodes is done at shutdown.
Modified:
erp5/trunk/products/ERP5Type/tests/ProcessingNodeTestCase.py
erp5/trunk/products/ERP5Type/tests/runUnitTest.py
Modified: erp5/trunk/products/ERP5Type/tests/ProcessingNodeTestCase.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5Type/tests/ProcessingNodeTestCase.py?rev=45694&r1=45693&r2=45694&view=diff
==============================================================================
--- erp5/trunk/products/ERP5Type/tests/ProcessingNodeTestCase.py [utf8] (original)
+++ erp5/trunk/products/ERP5Type/tests/ProcessingNodeTestCase.py [utf8] Wed Apr 27 17:05:43 2011
@@ -1,16 +1,36 @@
# -*- coding: utf-8 -*-
import base64, errno, os, select, socket, sys, time
from threading import Thread
+from UserDict import IterableUserDict
import Lifetime
import transaction
-from BTrees.OIBTree import OIBTree
from Testing import ZopeTestCase
+from ZODB.POSException import ConflictError
from zLOG import LOG, ERROR
from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY
from Products.ERP5Type.tests import backportUnittest
from Products.ERP5Type.tests.utils import createZServer
+class DictPersistentWrapper(IterableUserDict, object):
+
+ def __metaclass__(name, base, d):
+ def wrap(attr):
+ wrapped = getattr(base[0], attr)
+ def wrapper(self, *args, **kw):
+ self._persistent_object._p_changed = 1
+ return wrapped(self, *args, **kw)
+ wrapper.__name__ = attr
+ return wrapper
+ for attr in ('clear', 'setdefault', 'update', '__setitem__', '__delitem__'):
+ d[attr] = wrap(attr)
+ return type(name, base, d)
+
+ def __init__(self, dict, persistent_object):
+ self.data = dict
+ self._persistent_object = persistent_object
+
+
def patchActivityTool():
"""Redefine several methods of ActivityTool for unit tests
"""
@@ -39,8 +59,8 @@ def patchActivityTool():
def getNodeDict(self):
app = self.getPhysicalRoot()
if getattr(app, 'test_processing_nodes', None) is None:
- app.test_processing_nodes = OIBTree()
- return app.test_processing_nodes
+ app.test_processing_nodes = {}
+ return DictPersistentWrapper(app.test_processing_nodes, app)
@patch
def getDistributingNode(self):
@@ -74,6 +94,25 @@ def patchActivityTool():
self._orig_tic(processing_node, force)
+def Application_resolveConflict(self, old_state, saved_state, new_state):
+ """Solve conflicts in case several nodes register at the same time
+ """
+ new_state = new_state.copy()
+ old, saved, new = [set(state.pop('test_processing_nodes', {}).items())
+ for state in old_state, saved_state, new_state]
+ if sorted(old_state.items()) != sorted(saved_state.items()):
+ raise ConflictError
+ new |= saved - old
+ new -= old - saved
+ new_state['test_processing_nodes'] = nodes = dict(new)
+ if len(nodes) != len(new):
+ raise ConflictError
+ return new_state
+
+from OFS.Application import Application
+Application._p_resolveConflict = Application_resolveConflict
+
+
class ProcessingNodeTestCase(backportUnittest.TestCase, ZopeTestCase.TestCase):
"""Minimal ERP5 TestCase class to process activities
@@ -131,7 +170,16 @@ class ProcessingNodeTestCase(backportUni
if processing:
activity_tool.manage_addToProcessingList((currentNode,))
else:
- activity_tool.manage_removeFromProcessingList((currentNode,))
+ activity_tool.manage_delNode((currentNode,))
+
+ @classmethod
+ def unregisterNode(cls):
+ if ZopeTestCase.utils._Z2HOST is not None:
+ self = cls('unregisterNode')
+ self.app = self._app()
+ self._registerNode(distributing=0, processing=0)
+ transaction.commit()
+ self._close()
def assertNoPendingMessage(self):
"""Get the last error message from error_log"""
Modified: erp5/trunk/products/ERP5Type/tests/runUnitTest.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5Type/tests/runUnitTest.py?rev=45694&r1=45693&r2=45694&view=diff
==============================================================================
--- erp5/trunk/products/ERP5Type/tests/runUnitTest.py [utf8] (original)
+++ erp5/trunk/products/ERP5Type/tests/runUnitTest.py [utf8] Wed Apr 27 17:05:43 2011
@@ -538,18 +538,6 @@ def runUnitTestList(test_list, verbosity
if run_only:
ERP5TypeTestLoader.filter_test_list = None
- if not isinstance(Storage, ClientStorage):
- # Remove nodes that were registered during previous execution.
- # Set an empty dict (instead of delete the property)
- # in order to avoid conflicts on / when several ZEO clients registers.
- from BTrees.OIBTree import OIBTree
- app = ZopeTestCase.app()
- app.test_processing_nodes = OIBTree()
- import transaction
- transaction.commit()
- ZopeTestCase.close(app)
- del app
-
if zeo_client_pid_list is None:
result = suite()
else:
@@ -558,6 +546,7 @@ def runUnitTestList(test_list, verbosity
_print('done (%.3fs)\n' % (time.time() - _start))
result = TestRunner(verbosity=verbosity).run(suite)
finally:
+ ProcessingNodeTestCase.unregisterNode()
Storage.close()
if zeo_client_pid_list is not None:
# Wait that child processes exit. Stop ZEO storage (if any) after all
More information about the Erp5-report
mailing list