[Neo-report] r2665 jm - /trunk/neo/tests/functional/__init__.py

nobody at svn.erp5.org nobody at svn.erp5.org
Thu Mar 17 12:13:40 CET 2011


Author: jm
Date: Thu Mar 17 12:13:40 2011
New Revision: 2665

Log:
Partially fix race conditions when allocating ports

This fixes errors like:
- ConnectorException: makeListeningConnection on ('127.0.0.1', 33157) failed: 98:Address already in use
  Error executing './neomaster ...
- "error: [Errno 22] Invalid argument" in __allocatePort

A system-wide lock is used to prevent conflicts when several NEO test suites are
run simultaneously.

However, errors could still happen if other software tries to allocate ports.
Allocated ports are kept open as long as possible to minimize the probability
that it happens: they are closed just before spawning subprocesses.

Modified:
    trunk/neo/tests/functional/__init__.py

Modified: trunk/neo/tests/functional/__init__.py
==============================================================================
--- trunk/neo/tests/functional/__init__.py [iso-8859-1] (original)
+++ trunk/neo/tests/functional/__init__.py [iso-8859-1] Thu Mar 17 12:13:40 2011
@@ -32,7 +32,7 @@ from neo.neoctl.neoctl import NeoCTL, No
 from neo.lib.protocol import ClusterStates, NodeTypes, CellStates, NodeStates
 from neo.lib.util import dump, SOCKET_CONNECTORS_DICT
 from neo.tests import DB_ADMIN, DB_PASSWD, NeoTestBase, buildUrlFromString, \
-        ADDRESS_TYPE, IP_VERSION_FORMAT_DICT
+        ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, SocketLock
 from neo.client.Storage import Storage
 
 NEO_MASTER = 'neomaster'
@@ -54,6 +54,39 @@ class AlreadyStopped(Exception):
 class NotFound(Exception):
     pass
 
+class PortAllocator(object):
+
+    lock = SocketLock('neo.PortAllocator')
+    allocator_set = set()
+
+    def __init__(self):
+        self.socket_list = []
+
+    def allocate(self, address_type, local_ip):
+        s = socket.socket(address_type, socket.SOCK_STREAM)
+        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        if not self.lock.locked():
+            self.lock.acquire()
+        self.allocator_set.add(self)
+        self.socket_list.append(s)
+        s.bind((local_ip, 0))
+        return s.getsockname()[1]
+
+    def release(self):
+        for s in self.socket_list:
+            s.close()
+        self.socket_list = None
+
+    def reset(self):
+        if self.lock.locked():
+            self.allocator_set.discard(self)
+            if not self.allocator_set:
+                self.lock.release()
+            if self.socket_list:
+                for s in self.socket_list:
+                    s.close()
+            self.__init__()
+
 class NEOProcess(object):
     pid = 0
 
@@ -185,18 +218,19 @@ class NEOCluster(object):
         self.db_password = db_password
         self.db_list = db_list
         self.address_type = address_type
-        self.local_ip = IP_VERSION_FORMAT_DICT[self.address_type]
+        self.local_ip = local_ip = IP_VERSION_FORMAT_DICT[self.address_type]
         if clear_databases:
             self.setupDB()
         self.process_dict = {}
-        self.port_set = set()
         if temp_dir is None:
             temp_dir = tempfile.mkdtemp(prefix='neo_')
             print 'Using temp directory %r.' % (temp_dir, )
         self.temp_dir = temp_dir
-        admin_port = self.__allocatePort()
+        self.port_allocator = PortAllocator()
+        admin_port = self.port_allocator.allocate(address_type, local_ip)
         self.cluster_name = 'neo_%s' % (random.randint(0, 100), )
-        master_node_list = [self.__allocatePort() for i in xrange(master_node_count)]
+        master_node_list = [self.port_allocator.allocate(address_type, local_ip)
+                            for i in xrange(master_node_count)]
         self.master_nodes = '/'.join('%s:%s' % (
                 buildUrlFromString(self.local_ip), x, )
                 for x in master_node_list)
@@ -246,19 +280,6 @@ class NEOCluster(object):
         self.process_dict.setdefault(command, []).append(
             NEOProcess(command, uuid, arguments))
 
-    def __allocatePort(self):
-        port_set = self.port_set
-        s = socket.socket(self.address_type, socket.SOCK_STREAM)
-        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-        while True:
-            s.bind((self.local_ip, 0))
-            port = s.getsockname()[1]
-            if port not in port_set:
-                break
-        s.close()
-        port_set.add(port)
-        return port
-
     def __allocateUUID(self):
         uuid = os.urandom(16)
         self.uuid_set.add(uuid)
@@ -300,6 +321,7 @@ class NEOCluster(object):
     def run(self, except_storages=()):
         """ Start cluster processes except some storage nodes """
         assert len(self.process_dict)
+        self.port_allocator.release()
         for process_list in self.process_dict.itervalues():
             for process in process_list:
                 if process not in except_storages:
@@ -315,6 +337,7 @@ class NEOCluster(object):
                 time.sleep(0.5)
             else:
                 break
+        self.port_allocator.reset()
 
     def start(self, except_storages=()):
         """ Do a complete start of a cluster """
@@ -582,6 +605,10 @@ class NEOCluster(object):
     def __del__(self):
         if self.cleanup_on_delete:
             os.removedirs(self.temp_dir)
+        try:
+            self.port_allocator.reset()
+        except AttributeError:
+            pass
 
 
 class NEOFunctionalTest(NeoTestBase):




More information about the Neo-report mailing list