[Neo-report] r2725 jm - in /trunk: buildout/software-profiles/ neo/tests/ neo/tests/functi...

nobody at svn.erp5.org nobody at svn.erp5.org
Wed Apr 20 20:21:00 CEST 2011


Author: jm
Date: Wed Apr 20 20:21:00 2011
New Revision: 2725

Log:
tests: extend bdb/rpdb2 debuggers to help debugging a set of processes

This adds a dependency to 'psutil', which is required anyway to fix
NEOProcess.isAlive

Added:
    trunk/neo/tests/cluster.py
Modified:
    trunk/buildout/software-profiles/neo.cfg
    trunk/neo/tests/__init__.py
    trunk/neo/tests/functional/__init__.py

Modified: trunk/buildout/software-profiles/neo.cfg
==============================================================================
--- trunk/buildout/software-profiles/neo.cfg [iso-8859-1] (original)
+++ trunk/buildout/software-profiles/neo.cfg [iso-8859-1] Wed Apr 20 20:21:00 2011
@@ -24,7 +24,6 @@ eggs =
   neostorage
   neoclient
   neomaster
+  # for unit tests
   zope.testing
-
-
-
+  psutil

Modified: trunk/neo/tests/__init__.py
==============================================================================
--- trunk/neo/tests/__init__.py [iso-8859-1] (original)
+++ trunk/neo/tests/__init__.py [iso-8859-1] Wed Apr 20 20:21:00 2011
@@ -16,7 +16,6 @@
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 
 import __builtin__
-import errno
 import os
 import random
 import socket
@@ -30,9 +29,8 @@ from mock import Mock
 from neo.lib import debug, logger, protocol
 from neo.lib.protocol import Packets
 from neo.lib.util import getAddressType
-from time import time, gmtime, sleep
+from time import time, gmtime
 from struct import pack, unpack
-from functools import wraps
 
 DB_PREFIX = os.getenv('NEO_DB_PREFIX', 'test_neo_')
 DB_ADMIN = os.getenv('NEO_DB_ADMIN', 'root')
@@ -507,93 +505,5 @@ class DoNothingConnector(Mock):
     def getDescriptor(self):
         return self.desc
 
-class SocketLock(object):
-    """Basic system-wide lock"""
 
-    _socket = None
-
-    def __init__(self, address, family=socket.AF_UNIX, type=socket.SOCK_DGRAM):
-        if family == socket.AF_UNIX:
-            address = '\0' + address
-        self.address = address
-        self.socket_args = family, type
-
-    def locked(self):
-        return self._socket is not None
-
-    def acquire(self, blocking=1):
-        assert self._socket is None
-        s = socket.socket(*self.socket_args)
-        try:
-            while True:
-                try:
-                    s.bind(self.address)
-                except socket.error, e:
-                    if e[0] != errno.EADDRINUSE:
-                        raise
-                    if not blocking:
-                        return False
-                    sleep(1)
-                else:
-                    self._socket = s
-                    return True
-        finally:
-            if self._socket is None:
-                s.close()
-
-    def release(self):
-        s = self._socket
-        del self._socket
-        s.close()
-
-
-class ClusterPdb(object):
-    # TODO: monkey-patch normal code not to timeout
-    #       if another node is being debugged
-
-    def __init__(self):
-        self._r, self._w = os.pipe()
-        self.release(0)
-
-    def __getattr__(self, attr):
-        try:
-            debugger = self.__dict__['_debugger']
-        except KeyError:
-            self._debugger = debugger = debug.getPdb()
-            def hook(name):
-                hook = getattr(self, name)
-                hooked = getattr(debugger, name)
-                def wrapper(*args, **kw):
-                    return hook(hooked, *args, **kw)
-                setattr(debugger, name, wraps(hooked)(wrapper))
-            hook('interaction')
-        return getattr(debugger, attr)
-
-    def acquire(self):
-        return unpack('d', os.read(self._r, 8))[0]
-
-    def release(self, delay):
-        os.write(self._w, pack('d', delay))
-
-    def sync(self):
-        """Sleep as long as another process owns the lock"""
-        delay = self.acquire()
-        self.release(delay)
-        return delay
-
-    def interaction(self, hooked, *args, **kw):
-        delay = self.acquire() - time()
-        try:
-            return hooked(*args, **kw)
-        finally:
-            self.release(delay + time())
-
-    def wait(self, test, timeout, period):
-        end_time = time() + timeout
-        while not test():
-            if time() > end_time + self.sync():
-                return False
-            sleep(period)
-        return True
-
-__builtin__.pdb = ClusterPdb()
+__builtin__.pdb = lambda: debug.getPdb().set_trace(sys._getframe(1))

Added: trunk/neo/tests/cluster.py
==============================================================================
--- trunk/neo/tests/cluster.py (added)
+++ trunk/neo/tests/cluster.py [iso-8859-1] Wed Apr 20 20:21:00 2011
@@ -0,0 +1,289 @@
+#
+# Copyright (c) 2011 Nexedi SARL and Contributors. All Rights Reserved.
+#                    Julien Muchembled <jm at nexedi.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+
+import __builtin__
+import errno
+import mmap
+import os
+import psutil
+import signal
+import socket
+import sys
+import tempfile
+from cPickle import dumps, loads
+from functools import wraps
+from time import time, sleep
+from neo.lib import debug
+
+
+class SocketLock(object):
+    """Basic system-wide lock"""
+
+    _socket = None
+
+    def __init__(self, address, family=socket.AF_UNIX, type=socket.SOCK_DGRAM):
+        if family == socket.AF_UNIX:
+            address = '\0' + address
+        self.address = address
+        self.socket_args = family, type
+
+    def locked(self):
+        return self._socket is not None
+
+    def acquire(self, blocking=1):
+        assert self._socket is None
+        s = socket.socket(*self.socket_args)
+        try:
+            while True:
+                try:
+                    s.bind(self.address)
+                except socket.error, e:
+                    if e[0] != errno.EADDRINUSE:
+                        raise
+                    if not blocking:
+                        return False
+                    sleep(1)
+                else:
+                    self._socket = s
+                    return True
+        finally:
+            if self._socket is None:
+                s.close()
+
+    def release(self):
+        s = self._socket
+        del self._socket
+        s.close()
+
+
+class ClusterDict(dict):
+    """Simple storage (dict), shared with forked processes"""
+
+    _acquired = 0
+
+    def __init__(self, *args, **kw):
+        dict.__init__(self, *args, **kw)
+        self._r, self._w = os.pipe()
+        # shm_open(3) would be better but Python doesn't provide it.
+        # See also http://nikitathespider.com/python/shm/
+        f = tempfile.TemporaryFile()
+        try:
+            f.write(dumps(self.copy(), -1))
+            f.flush()
+            self._shared = mmap.mmap(f.fileno(), f.tell())
+        finally:
+            f.close()
+        self.release()
+
+    def __del__(self):
+        try:
+            os.close(self._r)
+            os.close(self._w)
+        except TypeError: # if os.close is None
+            pass
+
+    def acquire(self):
+        self._acquired += 1
+        if not self._acquired:
+            os.read(self._r, 1)
+            try:
+                self.clear()
+                shared = self._shared
+                shared.resize(shared.size())
+                self.update(loads(shared[:]))
+            except:
+                self.release()
+                raise
+
+    def release(self, commit=False):
+        if not self._acquired:
+            if commit:
+                self.commit()
+            os.write(self._w, '\0')
+        self._acquired -= 1
+
+    def commit(self):
+        shared = self._shared
+        p = dumps(self.copy(), -1)
+        shared.resize(len(p))
+        shared[:] = p
+
+cluster_dict = ClusterDict()
+
+
+class ClusterPdb(object):
+    """Multiprocess-aware wrapper around console and winpdb debuggers
+
+    __call__ is the method to break.
+
+    TODO: monkey-patch normal code not to timeout
+          if another node is being debugged
+    """
+
+    def __init__(self):
+        self._count_dict = {}
+
+    def __setattr__(self, name, value):
+        try:
+            hook = getattr(self, name)
+            setattr(value.im_self, value.__name__, wraps(value)(
+                lambda *args, **kw: hook(value, *args, **kw)))
+        except AttributeError:
+            object.__setattr__(self, name, value)
+
+    @property
+    def broken_peer(self):
+        return self._getLastPdb(os.getpid()) is None
+
+    def __call__(self, max_count=None, depth=0, text=None):
+        depth += 1
+        if max_count:
+            frame = sys._getframe(depth)
+            key = id(frame.f_code), frame.f_lineno
+            del frame
+            self._count_dict[key] = count = 1 + self._count_dict.get(key, 0)
+            if max_count < count:
+                return
+        if not text:
+            try:
+                import rpdb2
+            except ImportError:
+                if text is not None:
+                    raise
+            else:
+                if rpdb2.g_debugger is None:
+                    rpdb2_CStateManager = rpdb2.CStateManager
+                    def CStateManager(*args, **kw):
+                        rpdb2.CStateManager = rpdb2_CStateManager
+                        state_manager = rpdb2.CStateManager(*args, **kw)
+                        self._rpdb2_set_state = state_manager.set_state
+                        return state_manager
+                    rpdb2.CStateManager = CStateManager
+                return debug.winpdb(depth)
+        try:
+            debugger = self.__dict__['_debugger']
+        except KeyError:
+            assert 'rpdb2' not in sys.modules
+            self._debugger = debugger = debug.getPdb()
+            self._bdb_interaction = debugger.interaction
+        return debugger.set_trace(sys._getframe(depth))
+
+    def kill(self, pid, sig):
+        force = []
+        sigint_handler = None
+        try:
+            while 1:
+                cluster_dict.acquire()
+                try:
+                    last_pdb = cluster_dict.get('last_pdb', {})
+                    if force or pid not in last_pdb:
+                        os.kill(pid, sig)
+                        last_pdb.pop(pid, None)
+                        cluster_dict.commit()
+                        break
+                    try:
+                        if psutil.Process(pid).status == psutil.STATUS_ZOMBIE:
+                            break
+                    except psutil.NoSuchProcess:
+                        raise OSError(errno.ESRCH, 'No such process')
+                finally:
+                    cluster_dict.release()
+                if sigint_handler is None:
+                    sigint_handler = signal.signal(signal.SIGINT,
+                        lambda *args: force.append(None))
+                    sys.stderr.write('Pid %u is/was debugged.'
+                                    ' Press ^C to kill it...' % pid)
+                sleep(1)
+        finally:
+            if sigint_handler is not None:
+                signal.signal(signal.SIGINT, sigint_handler)
+        if force:
+            sys.stderr.write('\n')
+
+    def _lock_console(self):
+        while 1:
+            cluster_dict.acquire()
+            try:
+                if 'text_pdb' not in cluster_dict:
+                    cluster_dict['text_pdb'] = pid = os.getpid()
+                    cluster_dict.setdefault('last_pdb', {})[pid] = None
+                    cluster_dict.commit()
+                    break
+            finally:
+                cluster_dict.release()
+            sleep(0.5)
+
+    def _unlock_console(self):
+        cluster_dict.acquire()
+        try:
+            pid = cluster_dict.pop('text_pdb')
+            cluster_dict['last_pdb'][pid] = time()
+            cluster_dict.commit()
+        finally:
+            cluster_dict.release()
+
+    def _bdb_interaction(self, hooked, *args, **kw):
+        self._lock_console()
+        try:
+            return hooked(*args, **kw)
+        finally:
+            self._unlock_console()
+
+    def _rpdb2_set_state(self, hooked, state=None, *args, **kw):
+        from rpdb2 import STATE_BROKEN, STATE_DETACHED
+        cluster_dict.acquire()
+        try:
+            if state is None:
+                state = hooked.im_self.get_state()
+            last_pdb = cluster_dict.setdefault('last_pdb', {})
+            pid = os.getpid()
+            if state == STATE_DETACHED:
+                last_pdb.pop(pid, None)
+            else:
+                last_pdb[pid] = state != STATE_BROKEN and time() or None
+            return hooked(state=state, *args, **kw)
+        finally:
+            cluster_dict.release(True)
+
+    def _getLastPdb(self, *exclude):
+        result = 0
+        for pid, last_pdb in cluster_dict.get('last_pdb', {}).iteritems():
+            if pid not in exclude:
+                if last_pdb is None:
+                    return
+                if result < last_pdb:
+                    result = last_pdb
+        return result
+
+    def wait(self, test, timeout, period):
+        end_time = time() + timeout
+        while not test():
+            cluster_dict.acquire()
+            try:
+                last_pdb = self._getLastPdb()
+                if last_pdb is not None and \
+                   time() > max(last_pdb + timeout, end_time):
+                    return False
+            finally:
+                cluster_dict.release()
+            sleep(period)
+        return True
+
+__builtin__.pdb = ClusterPdb()
+
+signal.signal(signal.SIGUSR2, debug.decorate(lambda sig, frame: pdb(depth=2)))

Modified: trunk/neo/tests/functional/__init__.py
==============================================================================
--- trunk/neo/tests/functional/__init__.py [iso-8859-1] (original)
+++ trunk/neo/tests/functional/__init__.py [iso-8859-1] Wed Apr 20 20:21:00 2011
@@ -29,13 +29,15 @@ import unittest
 import tempfile
 import traceback
 import threading
+import psutil
 
 import neo.scripts
 from neo.neoctl.neoctl import NeoCTL, NotReadyException
 from neo.lib.protocol import ClusterStates, NodeTypes, CellStates, NodeStates
 from neo.lib.util import dump, SOCKET_CONNECTORS_DICT
 from neo.tests import DB_ADMIN, DB_PASSWD, NeoTestBase, buildUrlFromString, \
-        ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, SocketLock, getTempDirectory
+        ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, getTempDirectory
+from neo.tests.cluster import SocketLock
 from neo.client.Storage import Storage
 
 NEO_MASTER = 'neomaster'
@@ -150,14 +152,10 @@ class NEOProcess(object):
 
     def kill(self, sig=signal.SIGTERM):
         if self.pid:
-            delay = pdb.acquire()
             try:
-                try:
-                    os.kill(self.pid, sig)
-                except OSError:
-                    traceback.print_last()
-            finally:
-                pdb.release(delay)
+                pdb.kill(self.pid, sig)
+            except OSError:
+                traceback.print_last()
         else:
             raise AlreadyStopped
 
@@ -202,15 +200,9 @@ class NEOProcess(object):
 
     def isAlive(self):
         try:
-            os.kill(self.pid, 0)
-        except OSError, (errno, msg):
-            if errno == 3: # No such process
-                result = False
-            else:
-                raise
-        else:
-            result = True
-        return result
+            return psutil.Process(self.pid).status != psutil.STATUS_ZOMBIE
+        except psutil.NoSuchProcess:
+            return False
 
 class NEOCluster(object):
 




More information about the Neo-report mailing list