[Neo-report] r2787 jm - in /trunk: neo/tests/threaded/__init__.py tools/matrix

nobody at svn.erp5.org nobody at svn.erp5.org
Wed Jun 15 14:50:39 CEST 2011


Author: jm
Date: Wed Jun 15 14:50:38 2011
New Revision: 2787

Log:
tests: some cleanup in threaded.__init__

Modified:
    trunk/neo/tests/threaded/__init__.py
    trunk/tools/matrix

Modified: trunk/neo/tests/threaded/__init__.py
==============================================================================
--- trunk/neo/tests/threaded/__init__.py [iso-8859-1] (original)
+++ trunk/neo/tests/threaded/__init__.py [iso-8859-1] Wed Jun 15 14:50:38 2011
@@ -20,7 +20,6 @@ import os, random, socket, sys, tempfile
 from collections import deque
 from functools import wraps
 from Queue import Queue, Empty
-from weakref import ref as weak_ref
 from mock import Mock
 import transaction, ZODB
 import neo.admin.app, neo.master.app, neo.storage.app
@@ -48,66 +47,68 @@ def getVirtualIp(server_type):
 
 class Serialized(object):
 
-    _global_lock = threading.Lock()
-    _global_lock.acquire()
-    # TODO: use something else than Queue, for inspection or editing
-    #       (e.g. we'd like to suspend nodes temporarily)
-    _lock_list = Queue()
-    _pdb = False
-    pending = 0
+    @classmethod
+    def init(cls):
+        cls._global_lock = threading.Lock()
+        cls._global_lock.acquire()
+        # TODO: use something else than Queue, for inspection or editing
+        #       (e.g. we'd like to suspend nodes temporarily)
+        cls._lock_list = Queue()
+        cls._pdb = False
+        cls.pending = 0
 
-    @staticmethod
-    def release(lock=None, wake_other=True, stop=False):
+    @classmethod
+    def release(cls, lock=None, wake_other=True, stop=False):
         """Suspend lock owner and resume first suspended thread"""
         if lock is None:
-            lock = Serialized._global_lock
+            lock = cls._global_lock
             if stop: # XXX: we should fix ClusterStates.STOPPING
-                Serialized.pending = None
+                cls.pending = None
             else:
-                Serialized.pending = 0
+                cls.pending = 0
         try:
             sys._getframe(1).f_trace.im_self.set_continue()
-            Serialized._pdb = True
+            cls._pdb = True
         except AttributeError:
             pass
-        q = Serialized._lock_list
+        q = cls._lock_list
         q.put(lock)
         if wake_other:
             q.get().release()
 
-    @staticmethod
-    def acquire(lock=None):
+    @classmethod
+    def acquire(cls, lock=None):
         """Suspend all threads except lock owner"""
         if lock is None:
-            lock = Serialized._global_lock
+            lock = cls._global_lock
         lock.acquire()
-        if Serialized.pending is None: # XXX
-            if lock is Serialized._global_lock:
-                Serialized.pending = 0
+        if cls.pending is None: # XXX
+            if lock is cls._global_lock:
+                cls.pending = 0
             else:
                 sys.exit()
-        if Serialized._pdb:
-            Serialized._pdb = False
+        if cls._pdb:
+            cls._pdb = False
             try:
                 sys.stdout.write(threading.currentThread().node_name)
             except AttributeError:
                 pass
             pdb(1)
 
-    @staticmethod
-    def tic(lock=None):
+    @classmethod
+    def tic(cls, lock=None):
         # switch to another thread
         # (the following calls are not supposed to be debugged into)
-        Serialized.release(lock); Serialized.acquire(lock)
+        cls.release(lock); cls.acquire(lock)
 
-    @staticmethod
-    def background():
+    @classmethod
+    def background(cls):
         try:
-            Serialized._lock_list.get(0).release()
+            cls._lock_list.get(0).release()
         except Empty:
             pass
 
-class SerializedEventManager(Serialized, EventManager):
+class SerializedEventManager(EventManager):
 
     _lock = None
     _timeout = 0
@@ -147,7 +148,7 @@ class SerializedEventManager(Serialized,
             #      before the first message is sent.
             # TODO: Detect where a message is sent to jump immediately to nodes
             #       that will do something.
-            self.tic(self._lock)
+            Serialized.tic(self._lock)
             if timeout != 0:
                 timeout = self._timeout
                 if timeout != 0 and Serialized.pending:
@@ -294,15 +295,13 @@ class NEOCluster(object):
     SocketConnector_send = staticmethod(SocketConnector.send)
     Storage__init__ = staticmethod(Storage.__init__)
 
-    _cluster = None
+    _patched = threading.Lock()
 
-    @classmethod
-    def patch(cls):
+    def _patch(cluster):
+        cls = cluster.__class__
+        if not cls._patched.acquire(0):
+            raise RuntimeError("Can't run several cluster at the same time")
         def makeClientConnection(self, addr):
-            # XXX: 'threading.currentThread()._cluster'
-            #      does not work for client. We could monkey-patch
-            #      ClientConnection instead of using a global variable.
-            cluster = cls._cluster()
             try:
                 real_addr = cluster.resolv(addr)
                 return cls.SocketConnector_makeClientConnection(self, real_addr)
@@ -314,11 +313,6 @@ class NEOCluster(object):
             return result
         # TODO: 'sleep' should 'tic' in a smart way, so that storages can be
         #       safely started even if the cluster isn't.
-        def sleep(seconds):
-            l = threading.currentThread().em._lock
-            while Serialized.pending:
-                Serialized.tic(l)
-            Serialized.tic(l)
         bootstrap.sleep = lambda seconds: None
         BaseConnection.checkTimeout = lambda self, t: None
         SocketConnector.makeClientConnection = makeClientConnection
@@ -328,7 +322,7 @@ class NEOCluster(object):
         Storage.setupLog = lambda *args, **kw: None
 
     @classmethod
-    def unpatch(cls):
+    def _unpatch(cls):
         bootstrap.sleep = time.sleep
         BaseConnection.checkTimeout = cls.BaseConnection_checkTimeout
         SocketConnector.makeClientConnection = \
@@ -337,6 +331,7 @@ class NEOCluster(object):
             cls.SocketConnector_makeListeningConnection
         SocketConnector.send = cls.SocketConnector_send
         Storage.setupLog = setupLog
+        cls._patched.release()
 
     def __init__(self, master_count=1, partitions=1, replicas=0,
                        adapter=os.getenv('NEO_TESTS_ADAPTER', 'BTree'),
@@ -405,7 +400,8 @@ class NEOCluster(object):
         self.neoctl = NeoCTL(self)
 
     def start(self, storage_list=None, fast_startup=True):
-        self.__class__._cluster = weak_ref(self)
+        self._patch()
+        Serialized.init()
         for node_type in 'master', 'admin':
             for node in getattr(self, node_type + '_list'):
                 node.start()
@@ -448,7 +444,7 @@ class NEOCluster(object):
                         node.join()
         finally:
             Serialized.acquire()
-        self.__class__._cluster = None
+        self._unpatch()
 
     def tic(self, force=False):
         if force:
@@ -483,11 +479,3 @@ class NEOThreadedTest(NeoUnitTestBase):
     def setupLog(self):
         log_file = os.path.join(getTempDirectory(), self.id() + '.log')
         setupLog(LoggerThreadName(), log_file, True)
-
-    def setUp(self):
-        NeoUnitTestBase.setUp(self)
-        NEOCluster.patch()
-
-    def tearDown(self):
-        NEOCluster.unpatch()
-        NeoUnitTestBase.tearDown(self)

Modified: trunk/tools/matrix
==============================================================================
--- trunk/tools/matrix [iso-8859-1] (original)
+++ trunk/tools/matrix [iso-8859-1] Wed Jun 15 14:50:38 2011
@@ -45,16 +45,8 @@ class MatrixImportBenchmark(BenchmarkRun
         if storages[-1] < max_s:
             storages.append(max_s)
         replicas = range(min_r, max_r + 1)
-        if self._config.threaded:
-            from neo.tests.threaded import NEOCluster
-            NEOCluster.patch() # XXX ugly
-        try:
-            result_list = [self.runMatrix(storages, replicas)
-                           for x in xrange(self._config.repeat)]
-        finally:
-            if self._config.threaded:
-                from neo.tests.threaded import NEOCluster
-                NEOCluster.unpatch()# XXX ugly
+        result_list = [self.runMatrix(storages, replicas)
+                       for x in xrange(self._config.repeat)]
         results = {}
         for s in storages:
             results[s] = z = {}
@@ -84,7 +76,7 @@ class MatrixImportBenchmark(BenchmarkRun
             datafs = 'PROD1'
             import random, neo.tests.stat_zodb
             dfs_storage = getattr(neo.tests.stat_zodb, datafs)(
-                random.Random(0)).as_storage(10000)
+                random.Random(0)).as_storage(100)
         print "Import of %s with m=%s, s=%s, r=%s, p=%s" % (
                 datafs, masters, storages, replicas, partitions)
         # cluster




More information about the Neo-report mailing list