[Neo-report] r2414 vincent - in /trunk/neo: ./ client/ tests/

nobody at svn.erp5.org nobody at svn.erp5.org
Thu Nov 4 17:57:13 CET 2010


Author: vincent
Date: Thu Nov  4 17:57:11 2010
New Revision: 2414

Log:
Keep client consistent after close.

- set master_conn to None to clarify disconnection
- purge node pool after closing all connections
- allow restarting polling thread after its shutdown
Also, only start polling thread when needed (side-effect of last point).

Modified:
    trunk/neo/client/app.py
    trunk/neo/client/poll.py
    trunk/neo/client/pool.py
    trunk/neo/connection.py
    trunk/neo/dispatcher.py
    trunk/neo/tests/testDispatcher.py

Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Thu Nov  4 17:57:11 2010
@@ -126,13 +126,12 @@ class Application(object):
         # Start polling thread
         self.em = EventManager()
         self.poll_thread = ThreadedPoll(self.em, name=name)
-        neo.logging.debug('Started %s', self.poll_thread)
         psThreadedPoll()
         # Internal Attributes common to all thread
         self._db = None
         self.name = name
         self.connector_handler = getConnectorHandler(connector)
-        self.dispatcher = Dispatcher()
+        self.dispatcher = Dispatcher(self.poll_thread)
         self.nm = NodeManager()
         self.cp = ConnectionPool(self)
         self.pt = None
@@ -1209,6 +1208,8 @@ class Application(object):
         # down zope, so use __del__ to close connections
         for conn in self.em.getConnectionList():
             conn.close()
+        self.cp.flush()
+        self.master_conn = None
         # Stop polling thread
         neo.logging.debug('Stopping %s', self.poll_thread)
         self.poll_thread.stop()

Modified: trunk/neo/client/poll.py
==============================================================================
--- trunk/neo/client/poll.py [iso-8859-1] (original)
+++ trunk/neo/client/poll.py [iso-8859-1] Thu Nov  4 17:57:11 2010
@@ -18,7 +18,7 @@
 from threading import Thread, Event, enumerate as thread_enum
 import neo
 
-class ThreadedPoll(Thread):
+class _ThreadedPoll(Thread):
     """Polling thread."""
 
     # Garbage collector hint:
@@ -31,20 +31,55 @@ class ThreadedPoll(Thread):
         self.em = em
         self.setDaemon(True)
         self._stop = Event()
-        self.start()
 
     def run(self):
-        while not self._stop.isSet():
+        neo.logging.debug('Started %s', self)
+        while not self.stopping():
             # First check if we receive any new message from other node
             try:
-                self.em.poll()
+                # XXX: Delay cannot be infinite here, unless we have a way to
+                # interrupt this call when stopping.
+                self.em.poll(1)
             except:
                 self.neo.logging.error('poll raised, retrying', exc_info=1)
         self.neo.logging.debug('Threaded poll stopped')
+        self._stop.clear()
 
     def stop(self):
         self._stop.set()
 
+    def stopping(self):
+        return self._stop.isSet()
+
+class ThreadedPoll(object):
+    """
+    Wrapper for polloing thread, just to be able to start it again when
+    it stopped.
+    """
+    _thread = None
+    _started = False
+
+    def __init__(self, *args, **kw):
+        self._args = args
+        self._kw = kw
+        self.newThread()
+
+    def newThread(self):
+        self._thread = _ThreadedPoll(*self._args, **self._kw)
+
+    def start(self):
+        if self._started:
+            self.newThread()
+        else:
+            self._started = True
+        self._thread.start()
+
+    def __getattr__(self, key):
+        return getattr(self._thread, key)
+
+    def __repr__(self):
+        return repr(self._thread)
+
 def psThreadedPoll(log=None):
     """
     Logs alive ThreadedPoll threads.

Modified: trunk/neo/client/pool.py
==============================================================================
--- trunk/neo/client/pool.py [iso-8859-1] (original)
+++ trunk/neo/client/pool.py [iso-8859-1] Thu Nov  4 17:57:11 2010
@@ -178,3 +178,7 @@ class ConnectionPool(object):
         """Explicitly remove connection when a node is broken."""
         self.connection_dict.pop(node.getUUID(), None)
 
+    def flush(self):
+        """Remove all connections"""
+        self.connection_dict.clear()
+

Modified: trunk/neo/connection.py
==============================================================================
--- trunk/neo/connection.py [iso-8859-1] (original)
+++ trunk/neo/connection.py [iso-8859-1] Thu Nov  4 17:57:11 2010
@@ -705,6 +705,7 @@ class MTClientConnection(ClientConnectio
         self.acquire = lock.acquire
         self.release = lock.release
         self.dispatcher = kwargs.pop('dispatcher')
+        self.dispatcher.needPollThread()
         self.lock()
         try:
             super(MTClientConnection, self).__init__(*args, **kwargs)

Modified: trunk/neo/dispatcher.py
==============================================================================
--- trunk/neo/dispatcher.py [iso-8859-1] (original)
+++ trunk/neo/dispatcher.py [iso-8859-1] Thu Nov  4 17:57:11 2010
@@ -44,12 +44,13 @@ def giant_lock(func):
 class Dispatcher:
     """Register a packet, connection pair as expecting a response packet."""
 
-    def __init__(self):
+    def __init__(self, poll_thread=None):
         self.message_table = {}
         self.queue_dict = {}
         lock = Lock()
         self.lock_acquire = lock.acquire
         self.lock_release = lock.release
+        self.poll_thread = poll_thread
 
     @giant_lock
     @profiler_decorator
@@ -64,10 +65,27 @@ class Dispatcher:
         queue.put(data)
         return True
 
+    def needPollThread(self):
+        thread = self.poll_thread
+        # If thread has been stopped, wait for it to stop
+        # Note: This is not, ironically, thread safe: if one thread is
+        # stopping poll thread while we are checking its state here, a
+        # race condition will occur. If safety is required, locks should
+        # be added to control the access to thread's "start", "stopping"
+        # and "stop" methods.
+        if thread.stopping():
+            # XXX: ideally, we should wake thread up here, to be sure not
+            # to wait forever.
+            thread.join()
+        if not thread.isAlive():
+            thread.start()
+
     @giant_lock
     @profiler_decorator
     def register(self, conn, msg_id, queue):
         """Register an expectation for a reply."""
+        if self.poll_thread is not None:
+            self.needPollThread()
         self.message_table.setdefault(id(conn), {})[msg_id] = queue
         queue_dict = self.queue_dict
         key = id(queue)

Modified: trunk/neo/tests/testDispatcher.py
==============================================================================
--- trunk/neo/tests/testDispatcher.py [iso-8859-1] (original)
+++ trunk/neo/tests/testDispatcher.py [iso-8859-1] Thu Nov  4 17:57:11 2010
@@ -25,7 +25,8 @@ class DispatcherTests(NeoTestBase):
 
     def setUp(self):
         NeoTestBase.setUp(self)
-        self.dispatcher = Dispatcher()
+        self.fake_thread = Mock({'stopping': True})
+        self.dispatcher = Dispatcher(self.fake_thread)
 
     def testRegister(self):
         conn = object()
@@ -38,6 +39,7 @@ class DispatcherTests(NeoTestBase):
         self.assertTrue(queue.get(block=False) is MARKER)
         self.assertTrue(queue.empty())
         self.assertFalse(self.dispatcher.dispatch(conn, 2, None))
+        self.assertEqual(len(self.fake_thread.mockGetNamedCalls('start')), 1)
 
     def testUnregister(self):
         conn = object()





More information about the Neo-report mailing list