[Neo-report] r2725 jm - in /trunk: buildout/software-profiles/ neo/tests/ neo/tests/functi...
nobody at svn.erp5.org
nobody at svn.erp5.org
Wed Apr 20 20:21:00 CEST 2011
Author: jm
Date: Wed Apr 20 20:21:00 2011
New Revision: 2725
Log:
tests: extend bdb/rpdb2 debuggers to help debugging a set of processes
This adds a dependency to 'psutil', which is required anyway to fix
NEOProcess.isAlive
Added:
trunk/neo/tests/cluster.py
Modified:
trunk/buildout/software-profiles/neo.cfg
trunk/neo/tests/__init__.py
trunk/neo/tests/functional/__init__.py
Modified: trunk/buildout/software-profiles/neo.cfg
==============================================================================
--- trunk/buildout/software-profiles/neo.cfg [iso-8859-1] (original)
+++ trunk/buildout/software-profiles/neo.cfg [iso-8859-1] Wed Apr 20 20:21:00 2011
@@ -24,7 +24,6 @@ eggs =
neostorage
neoclient
neomaster
+ # for unit tests
zope.testing
-
-
-
+ psutil
Modified: trunk/neo/tests/__init__.py
==============================================================================
--- trunk/neo/tests/__init__.py [iso-8859-1] (original)
+++ trunk/neo/tests/__init__.py [iso-8859-1] Wed Apr 20 20:21:00 2011
@@ -16,7 +16,6 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import __builtin__
-import errno
import os
import random
import socket
@@ -30,9 +29,8 @@ from mock import Mock
from neo.lib import debug, logger, protocol
from neo.lib.protocol import Packets
from neo.lib.util import getAddressType
-from time import time, gmtime, sleep
+from time import time, gmtime
from struct import pack, unpack
-from functools import wraps
DB_PREFIX = os.getenv('NEO_DB_PREFIX', 'test_neo_')
DB_ADMIN = os.getenv('NEO_DB_ADMIN', 'root')
@@ -507,93 +505,5 @@ class DoNothingConnector(Mock):
def getDescriptor(self):
return self.desc
-class SocketLock(object):
- """Basic system-wide lock"""
- _socket = None
-
- def __init__(self, address, family=socket.AF_UNIX, type=socket.SOCK_DGRAM):
- if family == socket.AF_UNIX:
- address = '\0' + address
- self.address = address
- self.socket_args = family, type
-
- def locked(self):
- return self._socket is not None
-
- def acquire(self, blocking=1):
- assert self._socket is None
- s = socket.socket(*self.socket_args)
- try:
- while True:
- try:
- s.bind(self.address)
- except socket.error, e:
- if e[0] != errno.EADDRINUSE:
- raise
- if not blocking:
- return False
- sleep(1)
- else:
- self._socket = s
- return True
- finally:
- if self._socket is None:
- s.close()
-
- def release(self):
- s = self._socket
- del self._socket
- s.close()
-
-
-class ClusterPdb(object):
- # TODO: monkey-patch normal code not to timeout
- # if another node is being debugged
-
- def __init__(self):
- self._r, self._w = os.pipe()
- self.release(0)
-
- def __getattr__(self, attr):
- try:
- debugger = self.__dict__['_debugger']
- except KeyError:
- self._debugger = debugger = debug.getPdb()
- def hook(name):
- hook = getattr(self, name)
- hooked = getattr(debugger, name)
- def wrapper(*args, **kw):
- return hook(hooked, *args, **kw)
- setattr(debugger, name, wraps(hooked)(wrapper))
- hook('interaction')
- return getattr(debugger, attr)
-
- def acquire(self):
- return unpack('d', os.read(self._r, 8))[0]
-
- def release(self, delay):
- os.write(self._w, pack('d', delay))
-
- def sync(self):
- """Sleep as long as another process owns the lock"""
- delay = self.acquire()
- self.release(delay)
- return delay
-
- def interaction(self, hooked, *args, **kw):
- delay = self.acquire() - time()
- try:
- return hooked(*args, **kw)
- finally:
- self.release(delay + time())
-
- def wait(self, test, timeout, period):
- end_time = time() + timeout
- while not test():
- if time() > end_time + self.sync():
- return False
- sleep(period)
- return True
-
-__builtin__.pdb = ClusterPdb()
+__builtin__.pdb = lambda: debug.getPdb().set_trace(sys._getframe(1))
Added: trunk/neo/tests/cluster.py
==============================================================================
--- trunk/neo/tests/cluster.py (added)
+++ trunk/neo/tests/cluster.py [iso-8859-1] Wed Apr 20 20:21:00 2011
@@ -0,0 +1,289 @@
+#
+# Copyright (c) 2011 Nexedi SARL and Contributors. All Rights Reserved.
+# Julien Muchembled <jm at nexedi.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+
+import __builtin__
+import errno
+import mmap
+import os
+import psutil
+import signal
+import socket
+import sys
+import tempfile
+from cPickle import dumps, loads
+from functools import wraps
+from time import time, sleep
+from neo.lib import debug
+
+
+class SocketLock(object):
+ """Basic system-wide lock"""
+
+ _socket = None
+
+ def __init__(self, address, family=socket.AF_UNIX, type=socket.SOCK_DGRAM):
+ if family == socket.AF_UNIX:
+ address = '\0' + address
+ self.address = address
+ self.socket_args = family, type
+
+ def locked(self):
+ return self._socket is not None
+
+ def acquire(self, blocking=1):
+ assert self._socket is None
+ s = socket.socket(*self.socket_args)
+ try:
+ while True:
+ try:
+ s.bind(self.address)
+ except socket.error, e:
+ if e[0] != errno.EADDRINUSE:
+ raise
+ if not blocking:
+ return False
+ sleep(1)
+ else:
+ self._socket = s
+ return True
+ finally:
+ if self._socket is None:
+ s.close()
+
+ def release(self):
+ s = self._socket
+ del self._socket
+ s.close()
+
+
+class ClusterDict(dict):
+ """Simple storage (dict), shared with forked processes"""
+
+ _acquired = 0
+
+ def __init__(self, *args, **kw):
+ dict.__init__(self, *args, **kw)
+ self._r, self._w = os.pipe()
+ # shm_open(3) would be better but Python doesn't provide it.
+ # See also http://nikitathespider.com/python/shm/
+ f = tempfile.TemporaryFile()
+ try:
+ f.write(dumps(self.copy(), -1))
+ f.flush()
+ self._shared = mmap.mmap(f.fileno(), f.tell())
+ finally:
+ f.close()
+ self.release()
+
+ def __del__(self):
+ try:
+ os.close(self._r)
+ os.close(self._w)
+ except TypeError: # if os.close is None
+ pass
+
+ def acquire(self):
+ self._acquired += 1
+ if not self._acquired:
+ os.read(self._r, 1)
+ try:
+ self.clear()
+ shared = self._shared
+ shared.resize(shared.size())
+ self.update(loads(shared[:]))
+ except:
+ self.release()
+ raise
+
+ def release(self, commit=False):
+ if not self._acquired:
+ if commit:
+ self.commit()
+ os.write(self._w, '\0')
+ self._acquired -= 1
+
+ def commit(self):
+ shared = self._shared
+ p = dumps(self.copy(), -1)
+ shared.resize(len(p))
+ shared[:] = p
+
+cluster_dict = ClusterDict()
+
+
+class ClusterPdb(object):
+ """Multiprocess-aware wrapper around console and winpdb debuggers
+
+ __call__ is the method to break.
+
+ TODO: monkey-patch normal code not to timeout
+ if another node is being debugged
+ """
+
+ def __init__(self):
+ self._count_dict = {}
+
+ def __setattr__(self, name, value):
+ try:
+ hook = getattr(self, name)
+ setattr(value.im_self, value.__name__, wraps(value)(
+ lambda *args, **kw: hook(value, *args, **kw)))
+ except AttributeError:
+ object.__setattr__(self, name, value)
+
+ @property
+ def broken_peer(self):
+ return self._getLastPdb(os.getpid()) is None
+
+ def __call__(self, max_count=None, depth=0, text=None):
+ depth += 1
+ if max_count:
+ frame = sys._getframe(depth)
+ key = id(frame.f_code), frame.f_lineno
+ del frame
+ self._count_dict[key] = count = 1 + self._count_dict.get(key, 0)
+ if max_count < count:
+ return
+ if not text:
+ try:
+ import rpdb2
+ except ImportError:
+ if text is not None:
+ raise
+ else:
+ if rpdb2.g_debugger is None:
+ rpdb2_CStateManager = rpdb2.CStateManager
+ def CStateManager(*args, **kw):
+ rpdb2.CStateManager = rpdb2_CStateManager
+ state_manager = rpdb2.CStateManager(*args, **kw)
+ self._rpdb2_set_state = state_manager.set_state
+ return state_manager
+ rpdb2.CStateManager = CStateManager
+ return debug.winpdb(depth)
+ try:
+ debugger = self.__dict__['_debugger']
+ except KeyError:
+ assert 'rpdb2' not in sys.modules
+ self._debugger = debugger = debug.getPdb()
+ self._bdb_interaction = debugger.interaction
+ return debugger.set_trace(sys._getframe(depth))
+
+ def kill(self, pid, sig):
+ force = []
+ sigint_handler = None
+ try:
+ while 1:
+ cluster_dict.acquire()
+ try:
+ last_pdb = cluster_dict.get('last_pdb', {})
+ if force or pid not in last_pdb:
+ os.kill(pid, sig)
+ last_pdb.pop(pid, None)
+ cluster_dict.commit()
+ break
+ try:
+ if psutil.Process(pid).status == psutil.STATUS_ZOMBIE:
+ break
+ except psutil.NoSuchProcess:
+ raise OSError(errno.ESRCH, 'No such process')
+ finally:
+ cluster_dict.release()
+ if sigint_handler is None:
+ sigint_handler = signal.signal(signal.SIGINT,
+ lambda *args: force.append(None))
+ sys.stderr.write('Pid %u is/was debugged.'
+ ' Press ^C to kill it...' % pid)
+ sleep(1)
+ finally:
+ if sigint_handler is not None:
+ signal.signal(signal.SIGINT, sigint_handler)
+ if force:
+ sys.stderr.write('\n')
+
+ def _lock_console(self):
+ while 1:
+ cluster_dict.acquire()
+ try:
+ if 'text_pdb' not in cluster_dict:
+ cluster_dict['text_pdb'] = pid = os.getpid()
+ cluster_dict.setdefault('last_pdb', {})[pid] = None
+ cluster_dict.commit()
+ break
+ finally:
+ cluster_dict.release()
+ sleep(0.5)
+
+ def _unlock_console(self):
+ cluster_dict.acquire()
+ try:
+ pid = cluster_dict.pop('text_pdb')
+ cluster_dict['last_pdb'][pid] = time()
+ cluster_dict.commit()
+ finally:
+ cluster_dict.release()
+
+ def _bdb_interaction(self, hooked, *args, **kw):
+ self._lock_console()
+ try:
+ return hooked(*args, **kw)
+ finally:
+ self._unlock_console()
+
+ def _rpdb2_set_state(self, hooked, state=None, *args, **kw):
+ from rpdb2 import STATE_BROKEN, STATE_DETACHED
+ cluster_dict.acquire()
+ try:
+ if state is None:
+ state = hooked.im_self.get_state()
+ last_pdb = cluster_dict.setdefault('last_pdb', {})
+ pid = os.getpid()
+ if state == STATE_DETACHED:
+ last_pdb.pop(pid, None)
+ else:
+ last_pdb[pid] = state != STATE_BROKEN and time() or None
+ return hooked(state=state, *args, **kw)
+ finally:
+ cluster_dict.release(True)
+
+ def _getLastPdb(self, *exclude):
+ result = 0
+ for pid, last_pdb in cluster_dict.get('last_pdb', {}).iteritems():
+ if pid not in exclude:
+ if last_pdb is None:
+ return
+ if result < last_pdb:
+ result = last_pdb
+ return result
+
+ def wait(self, test, timeout, period):
+ end_time = time() + timeout
+ while not test():
+ cluster_dict.acquire()
+ try:
+ last_pdb = self._getLastPdb()
+ if last_pdb is not None and \
+ time() > max(last_pdb + timeout, end_time):
+ return False
+ finally:
+ cluster_dict.release()
+ sleep(period)
+ return True
+
+__builtin__.pdb = ClusterPdb()
+
+signal.signal(signal.SIGUSR2, debug.decorate(lambda sig, frame: pdb(depth=2)))
Modified: trunk/neo/tests/functional/__init__.py
==============================================================================
--- trunk/neo/tests/functional/__init__.py [iso-8859-1] (original)
+++ trunk/neo/tests/functional/__init__.py [iso-8859-1] Wed Apr 20 20:21:00 2011
@@ -29,13 +29,15 @@ import unittest
import tempfile
import traceback
import threading
+import psutil
import neo.scripts
from neo.neoctl.neoctl import NeoCTL, NotReadyException
from neo.lib.protocol import ClusterStates, NodeTypes, CellStates, NodeStates
from neo.lib.util import dump, SOCKET_CONNECTORS_DICT
from neo.tests import DB_ADMIN, DB_PASSWD, NeoTestBase, buildUrlFromString, \
- ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, SocketLock, getTempDirectory
+ ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, getTempDirectory
+from neo.tests.cluster import SocketLock
from neo.client.Storage import Storage
NEO_MASTER = 'neomaster'
@@ -150,14 +152,10 @@ class NEOProcess(object):
def kill(self, sig=signal.SIGTERM):
if self.pid:
- delay = pdb.acquire()
try:
- try:
- os.kill(self.pid, sig)
- except OSError:
- traceback.print_last()
- finally:
- pdb.release(delay)
+ pdb.kill(self.pid, sig)
+ except OSError:
+ traceback.print_last()
else:
raise AlreadyStopped
@@ -202,15 +200,9 @@ class NEOProcess(object):
def isAlive(self):
try:
- os.kill(self.pid, 0)
- except OSError, (errno, msg):
- if errno == 3: # No such process
- result = False
- else:
- raise
- else:
- result = True
- return result
+ return psutil.Process(self.pid).status != psutil.STATUS_ZOMBIE
+ except psutil.NoSuchProcess:
+ return False
class NEOCluster(object):
More information about the Neo-report
mailing list