[Neo-report] r2414 vincent - in /trunk/neo: ./ client/ tests/
nobody at svn.erp5.org
nobody at svn.erp5.org
Thu Nov 4 17:57:13 CET 2010
Author: vincent
Date: Thu Nov 4 17:57:11 2010
New Revision: 2414
Log:
Keep client consistent after close.
- set master_conn to None to clarify disconnection
- purge node pool after closing all connections
- allow restarting polling thread after its shutdown
Also, only start polling thread when needed (side-effect of last point).
Modified:
trunk/neo/client/app.py
trunk/neo/client/poll.py
trunk/neo/client/pool.py
trunk/neo/connection.py
trunk/neo/dispatcher.py
trunk/neo/tests/testDispatcher.py
Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Thu Nov 4 17:57:11 2010
@@ -126,13 +126,12 @@ class Application(object):
# Start polling thread
self.em = EventManager()
self.poll_thread = ThreadedPoll(self.em, name=name)
- neo.logging.debug('Started %s', self.poll_thread)
psThreadedPoll()
# Internal Attributes common to all thread
self._db = None
self.name = name
self.connector_handler = getConnectorHandler(connector)
- self.dispatcher = Dispatcher()
+ self.dispatcher = Dispatcher(self.poll_thread)
self.nm = NodeManager()
self.cp = ConnectionPool(self)
self.pt = None
@@ -1209,6 +1208,8 @@ class Application(object):
# down zope, so use __del__ to close connections
for conn in self.em.getConnectionList():
conn.close()
+ self.cp.flush()
+ self.master_conn = None
# Stop polling thread
neo.logging.debug('Stopping %s', self.poll_thread)
self.poll_thread.stop()
Modified: trunk/neo/client/poll.py
==============================================================================
--- trunk/neo/client/poll.py [iso-8859-1] (original)
+++ trunk/neo/client/poll.py [iso-8859-1] Thu Nov 4 17:57:11 2010
@@ -18,7 +18,7 @@
from threading import Thread, Event, enumerate as thread_enum
import neo
-class ThreadedPoll(Thread):
+class _ThreadedPoll(Thread):
"""Polling thread."""
# Garbage collector hint:
@@ -31,20 +31,55 @@ class ThreadedPoll(Thread):
self.em = em
self.setDaemon(True)
self._stop = Event()
- self.start()
def run(self):
- while not self._stop.isSet():
+ neo.logging.debug('Started %s', self)
+ while not self.stopping():
# First check if we receive any new message from other node
try:
- self.em.poll()
+ # XXX: Delay cannot be infinite here, unless we have a way to
+ # interrupt this call when stopping.
+ self.em.poll(1)
except:
self.neo.logging.error('poll raised, retrying', exc_info=1)
self.neo.logging.debug('Threaded poll stopped')
+ self._stop.clear()
def stop(self):
self._stop.set()
+ def stopping(self):
+ return self._stop.isSet()
+
+class ThreadedPoll(object):
+ """
+ Wrapper for polloing thread, just to be able to start it again when
+ it stopped.
+ """
+ _thread = None
+ _started = False
+
+ def __init__(self, *args, **kw):
+ self._args = args
+ self._kw = kw
+ self.newThread()
+
+ def newThread(self):
+ self._thread = _ThreadedPoll(*self._args, **self._kw)
+
+ def start(self):
+ if self._started:
+ self.newThread()
+ else:
+ self._started = True
+ self._thread.start()
+
+ def __getattr__(self, key):
+ return getattr(self._thread, key)
+
+ def __repr__(self):
+ return repr(self._thread)
+
def psThreadedPoll(log=None):
"""
Logs alive ThreadedPoll threads.
Modified: trunk/neo/client/pool.py
==============================================================================
--- trunk/neo/client/pool.py [iso-8859-1] (original)
+++ trunk/neo/client/pool.py [iso-8859-1] Thu Nov 4 17:57:11 2010
@@ -178,3 +178,7 @@ class ConnectionPool(object):
"""Explicitly remove connection when a node is broken."""
self.connection_dict.pop(node.getUUID(), None)
+ def flush(self):
+ """Remove all connections"""
+ self.connection_dict.clear()
+
Modified: trunk/neo/connection.py
==============================================================================
--- trunk/neo/connection.py [iso-8859-1] (original)
+++ trunk/neo/connection.py [iso-8859-1] Thu Nov 4 17:57:11 2010
@@ -705,6 +705,7 @@ class MTClientConnection(ClientConnectio
self.acquire = lock.acquire
self.release = lock.release
self.dispatcher = kwargs.pop('dispatcher')
+ self.dispatcher.needPollThread()
self.lock()
try:
super(MTClientConnection, self).__init__(*args, **kwargs)
Modified: trunk/neo/dispatcher.py
==============================================================================
--- trunk/neo/dispatcher.py [iso-8859-1] (original)
+++ trunk/neo/dispatcher.py [iso-8859-1] Thu Nov 4 17:57:11 2010
@@ -44,12 +44,13 @@ def giant_lock(func):
class Dispatcher:
"""Register a packet, connection pair as expecting a response packet."""
- def __init__(self):
+ def __init__(self, poll_thread=None):
self.message_table = {}
self.queue_dict = {}
lock = Lock()
self.lock_acquire = lock.acquire
self.lock_release = lock.release
+ self.poll_thread = poll_thread
@giant_lock
@profiler_decorator
@@ -64,10 +65,27 @@ class Dispatcher:
queue.put(data)
return True
+ def needPollThread(self):
+ thread = self.poll_thread
+ # If thread has been stopped, wait for it to stop
+ # Note: This is not, ironically, thread safe: if one thread is
+ # stopping poll thread while we are checking its state here, a
+ # race condition will occur. If safety is required, locks should
+ # be added to control the access to thread's "start", "stopping"
+ # and "stop" methods.
+ if thread.stopping():
+ # XXX: ideally, we should wake thread up here, to be sure not
+ # to wait forever.
+ thread.join()
+ if not thread.isAlive():
+ thread.start()
+
@giant_lock
@profiler_decorator
def register(self, conn, msg_id, queue):
"""Register an expectation for a reply."""
+ if self.poll_thread is not None:
+ self.needPollThread()
self.message_table.setdefault(id(conn), {})[msg_id] = queue
queue_dict = self.queue_dict
key = id(queue)
Modified: trunk/neo/tests/testDispatcher.py
==============================================================================
--- trunk/neo/tests/testDispatcher.py [iso-8859-1] (original)
+++ trunk/neo/tests/testDispatcher.py [iso-8859-1] Thu Nov 4 17:57:11 2010
@@ -25,7 +25,8 @@ class DispatcherTests(NeoTestBase):
def setUp(self):
NeoTestBase.setUp(self)
- self.dispatcher = Dispatcher()
+ self.fake_thread = Mock({'stopping': True})
+ self.dispatcher = Dispatcher(self.fake_thread)
def testRegister(self):
conn = object()
@@ -38,6 +39,7 @@ class DispatcherTests(NeoTestBase):
self.assertTrue(queue.get(block=False) is MARKER)
self.assertTrue(queue.empty())
self.assertFalse(self.dispatcher.dispatch(conn, 2, None))
+ self.assertEqual(len(self.fake_thread.mockGetNamedCalls('start')), 1)
def testUnregister(self):
conn = object()
More information about the Neo-report
mailing list