[Neo-report] r2664 jm - /trunk/neo/tests/__init__.py
nobody at svn.erp5.org
nobody at svn.erp5.org
Thu Mar 17 12:13:31 CET 2011
Author: jm
Date: Thu Mar 17 12:13:30 2011
New Revision: 2664
Log:
tests: implement a basic system-wide lock
It will be used to allocate ports in a safer way.
Modified:
trunk/neo/tests/__init__.py
Modified: trunk/neo/tests/__init__.py
==============================================================================
--- trunk/neo/tests/__init__.py [iso-8859-1] (original)
+++ trunk/neo/tests/__init__.py [iso-8859-1] Thu Mar 17 12:13:30 2011
@@ -15,6 +15,7 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+import errno
import os
import random
import socket
@@ -28,7 +29,7 @@ from mock import Mock
from neo.lib import protocol
from neo.lib.protocol import Packets
from neo.lib.util import getAddressType
-from time import time, gmtime
+from time import time, gmtime, sleep
from struct import pack, unpack
DB_PREFIX = os.getenv('NEO_DB_PREFIX', 'test_neo_')
@@ -455,3 +456,41 @@ class DoNothingConnector(Mock):
def getDescriptor(self):
return self.desc
+class SocketLock(object):
+ """Basic system-wide lock"""
+
+ _socket = None
+
+ def __init__(self, address, family=socket.AF_UNIX, type=socket.SOCK_DGRAM):
+ if family == socket.AF_UNIX:
+ address = '\0' + address
+ self.address = address
+ self.socket_args = family, type
+
+ def locked(self):
+ return self._socket is not None
+
+ def acquire(self, blocking=1):
+ assert self._socket is None
+ s = socket.socket(*self.socket_args)
+ try:
+ while True:
+ try:
+ s.bind(self.address)
+ except socket.error, e:
+ if e[0] != errno.EADDRINUSE:
+ raise
+ if not blocking:
+ return False
+ sleep(1)
+ else:
+ self._socket = s
+ return True
+ finally:
+ if self._socket is None:
+ s.close()
+
+ def release(self):
+ s = self._socket
+ del self._socket
+ s.close()
More information about the Neo-report
mailing list