[Neo-report] r2665 jm - /trunk/neo/tests/functional/__init__.py
nobody at svn.erp5.org
nobody at svn.erp5.org
Thu Mar 17 12:13:40 CET 2011
Author: jm
Date: Thu Mar 17 12:13:40 2011
New Revision: 2665
Log:
Partially fix race conditions when allocating ports
This fixes errors like:
- ConnectorException: makeListeningConnection on ('127.0.0.1', 33157) failed: 98:Address already in use
Error executing './neomaster ...
- "error: [Errno 22] Invalid argument" in __allocatePort
A system-wide lock is used to prevent conflicts when several NEO test suites are
run simultaneously.
However, errors could still happen if other software tries to allocate ports.
Allocated ports are kept open as long as possible to minimize the probability
that it happens: they are closed just before spawning subprocesses.
Modified:
trunk/neo/tests/functional/__init__.py
Modified: trunk/neo/tests/functional/__init__.py
==============================================================================
--- trunk/neo/tests/functional/__init__.py [iso-8859-1] (original)
+++ trunk/neo/tests/functional/__init__.py [iso-8859-1] Thu Mar 17 12:13:40 2011
@@ -32,7 +32,7 @@ from neo.neoctl.neoctl import NeoCTL, No
from neo.lib.protocol import ClusterStates, NodeTypes, CellStates, NodeStates
from neo.lib.util import dump, SOCKET_CONNECTORS_DICT
from neo.tests import DB_ADMIN, DB_PASSWD, NeoTestBase, buildUrlFromString, \
- ADDRESS_TYPE, IP_VERSION_FORMAT_DICT
+ ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, SocketLock
from neo.client.Storage import Storage
NEO_MASTER = 'neomaster'
@@ -54,6 +54,39 @@ class AlreadyStopped(Exception):
class NotFound(Exception):
pass
+class PortAllocator(object):
+
+ lock = SocketLock('neo.PortAllocator')
+ allocator_set = set()
+
+ def __init__(self):
+ self.socket_list = []
+
+ def allocate(self, address_type, local_ip):
+ s = socket.socket(address_type, socket.SOCK_STREAM)
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ if not self.lock.locked():
+ self.lock.acquire()
+ self.allocator_set.add(self)
+ self.socket_list.append(s)
+ s.bind((local_ip, 0))
+ return s.getsockname()[1]
+
+ def release(self):
+ for s in self.socket_list:
+ s.close()
+ self.socket_list = None
+
+ def reset(self):
+ if self.lock.locked():
+ self.allocator_set.discard(self)
+ if not self.allocator_set:
+ self.lock.release()
+ if self.socket_list:
+ for s in self.socket_list:
+ s.close()
+ self.__init__()
+
class NEOProcess(object):
pid = 0
@@ -185,18 +218,19 @@ class NEOCluster(object):
self.db_password = db_password
self.db_list = db_list
self.address_type = address_type
- self.local_ip = IP_VERSION_FORMAT_DICT[self.address_type]
+ self.local_ip = local_ip = IP_VERSION_FORMAT_DICT[self.address_type]
if clear_databases:
self.setupDB()
self.process_dict = {}
- self.port_set = set()
if temp_dir is None:
temp_dir = tempfile.mkdtemp(prefix='neo_')
print 'Using temp directory %r.' % (temp_dir, )
self.temp_dir = temp_dir
- admin_port = self.__allocatePort()
+ self.port_allocator = PortAllocator()
+ admin_port = self.port_allocator.allocate(address_type, local_ip)
self.cluster_name = 'neo_%s' % (random.randint(0, 100), )
- master_node_list = [self.__allocatePort() for i in xrange(master_node_count)]
+ master_node_list = [self.port_allocator.allocate(address_type, local_ip)
+ for i in xrange(master_node_count)]
self.master_nodes = '/'.join('%s:%s' % (
buildUrlFromString(self.local_ip), x, )
for x in master_node_list)
@@ -246,19 +280,6 @@ class NEOCluster(object):
self.process_dict.setdefault(command, []).append(
NEOProcess(command, uuid, arguments))
- def __allocatePort(self):
- port_set = self.port_set
- s = socket.socket(self.address_type, socket.SOCK_STREAM)
- s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- while True:
- s.bind((self.local_ip, 0))
- port = s.getsockname()[1]
- if port not in port_set:
- break
- s.close()
- port_set.add(port)
- return port
-
def __allocateUUID(self):
uuid = os.urandom(16)
self.uuid_set.add(uuid)
@@ -300,6 +321,7 @@ class NEOCluster(object):
def run(self, except_storages=()):
""" Start cluster processes except some storage nodes """
assert len(self.process_dict)
+ self.port_allocator.release()
for process_list in self.process_dict.itervalues():
for process in process_list:
if process not in except_storages:
@@ -315,6 +337,7 @@ class NEOCluster(object):
time.sleep(0.5)
else:
break
+ self.port_allocator.reset()
def start(self, except_storages=()):
""" Do a complete start of a cluster """
@@ -582,6 +605,10 @@ class NEOCluster(object):
def __del__(self):
if self.cleanup_on_delete:
os.removedirs(self.temp_dir)
+ try:
+ self.port_allocator.reset()
+ except AttributeError:
+ pass
class NEOFunctionalTest(NeoTestBase):
More information about the Neo-report
mailing list