[Neo-report] r2009 vincent - in /trunk/neo: connection.py tests/testConnection.py

nobody at svn.erp5.org nobody at svn.erp5.org
Thu Apr 22 17:55:09 CEST 2010


Author: vincent
Date: Thu Apr 22 17:55:09 2010
New Revision: 2009

Log:
Change (again) the way timeouts are handled.

There are 2 distinct kinds of timeout events:
- an unresponsive node
  This is a connection-level timeout.
  This is handled by the Timeout class, triggering pings and monitoring
  incoming data to decide when remote node is considered dead.
- a "too long" processing from an otherwise responsive node
  This is a per-request timeout.
  This is handled by the HandlerSwitcher class, triggering only
  disconnections when an answer takes too long to arrive (historical
  behaviour, not so useful when exchanging with a single-threaded peer).

Previous implementation mixed both, and had shortcomings (ping would
timeout almost immediately, it was not possible to tell which message
caused a timeout).

Update tests.

Modified:
    trunk/neo/connection.py
    trunk/neo/tests/testConnection.py

Modified: trunk/neo/connection.py
==============================================================================
--- trunk/neo/connection.py [iso-8859-1] (original)
+++ trunk/neo/connection.py [iso-8859-1] Thu Apr 22 17:55:09 2010
@@ -31,7 +31,7 @@
 from neo.util import ReadBuffer
 from neo.profiling import profiler_decorator
 
-PING_DELAY = 5
+PING_DELAY = 6
 PING_TIMEOUT = 5
 INCOMING_TIMEOUT = 10
 CRITICAL_TIMEOUT = 30
@@ -70,6 +70,8 @@
 
 
 class HandlerSwitcher(object):
+    _next_timeout = None
+    _next_timeout_msg_id = None
 
     def __init__(self, connection, handler):
         self._connection = connection
@@ -87,7 +89,7 @@
         return self._pending[0][1]
 
     @profiler_decorator
-    def emit(self, request):
+    def emit(self, request, timeout):
         # register the request in the current handler
         _pending = self._pending
         assert len(_pending) == 1 or _pending[0][0]
@@ -96,7 +98,19 @@
         answer_class = request.getAnswerClass()
         assert answer_class is not None, "Not a request"
         assert msg_id not in request_dict, "Packet id already expected"
-        request_dict[msg_id] = answer_class
+        next_timeout = self._next_timeout
+        if next_timeout is None or timeout < next_timeout:
+            self._next_timeout = timeout
+            self._next_timeout_msg_id = msg_id
+        request_dict[msg_id] = (answer_class, timeout)
+
+    def checkTimeout(self, t):
+        next_timeout = self._next_timeout
+        if next_timeout is not None and next_timeout < t:
+            result = self._next_timeout_msg_id
+        else:
+            result = None
+        return result
 
     @profiler_decorator
     def handle(self, packet):
@@ -109,7 +123,7 @@
             handler.packetReceived(self._connection, packet)
             return
         # checkout the expected answer class
-        klass = request_dict.pop(msg_id, None)
+        (klass, timeout) = request_dict.pop(msg_id, (None, None))
         if klass and isinstance(packet, klass) or packet.isError():
             handler.packetReceived(self._connection, packet)
         else:
@@ -122,6 +136,18 @@
         while len(self._pending) > 1 and not self._pending[0][0]:
             del self._pending[0]
             logging.debug('Apply handler %r', self._pending[0][1])
+        if timeout == self._next_timeout:
+            # Find next timeout and its msg_id
+            timeout_list = []
+            extend = timeout_list.extend
+            for (request_dict, handler) in self._pending:
+                extend(((timeout, msg_id) \
+                    for msg_id, (_, timeout) in request_dict.iteritems()))
+            if timeout_list:
+                timeout_list.sort(key=lambda x: x[0])
+                self._next_timeout, self._next_timeout_msg_id = timeout_list[0]
+            else:
+                self._next_timeout, self._next_timeout_msg_id = None, None
 
     @profiler_decorator
     def setHandler(self, handler):
@@ -136,31 +162,48 @@
 
 
 class Timeout(object):
-    """ Keep track of current timeouts """
+    """ Keep track of connection-level timeouts """
 
     def __init__(self):
         self._ping_time = None
         self._critical_time = None
 
-    def update(self, t, timeout=CRITICAL_TIMEOUT):
-        """ Update the new critical time """
-        self._ping_time = t + PING_TIMEOUT
-        critical_time = self._ping_time + timeout
-        self._critical_time = max(critical_time, self._critical_time)
+    def update(self, t):
+        """
+        Send occurred:
+        - set ping time if earlier than existing one
+        """
+        ping_time = self._ping_time
+        t += PING_DELAY
+        if ping_time is None or t < ping_time:
+            self._ping_time = t
 
     def refresh(self, t):
-        """ Refresh timeout after something received """
+        """
+        Recv occured:
+        - reschedule next ping time
+        - as this is an evidence that node is alive, remove pong expectation
+        """
         self._ping_time = t + PING_DELAY
+        self._critical_time = None
+
+    def ping(self, t):
+        """
+        Ping send occured:
+        - reschedule next ping time
+        - set pong expectation
+        """
+        self._ping_time = t + PING_DELAY
+        self._critical_time = t + PING_TIMEOUT
 
     def softExpired(self, t):
-        """ Indicate if the soft timeout (ping delay) is reached """
-        # hard timeout takes precedences
-        return self._ping_time < t < self._critical_time
+        """ Do we need to ping ? """
+        return self._ping_time < t
 
     def hardExpired(self, t):
-        """ Indicate if hard (or pong) timeout is reached """
-        # should be called if softExpired if False
-        return self._critical_time < t or self._ping_time < t
+        """ Have we reached pong latest arrival time, if set ? """
+        critical_time = self._critical_time
+        return critical_time is not None and critical_time < t
 
 
 class BaseConnection(object):
@@ -176,16 +219,23 @@
         event_manager.register(self)
 
     def checkTimeout(self, t):
-        if self._handlers.isPending():
-            if self._timeout.softExpired(t):
-                self._timeout.refresh(t)
-                self.ping()
+        handlers = self._handlers
+        if handlers.isPending():
+            msg_id = handlers.checkTimeout(t)
+            if msg_id is not None:
+                logging.info('timeout for %r with %s:%d', msg_id,
+                    *self.getAddress())
+                self.close()
+                self.getHandler().timeoutExpired(self)
             elif self._timeout.hardExpired(t):
                 # critical time reach or pong not received, abort
                 logging.info('timeout with %s:%d', *(self.getAddress()))
                 self.notify(Packets.Notify('Timeout'))
                 self.abort()
                 self.getHandler().timeoutExpired(self)
+            elif self._timeout.softExpired(t):
+                self._timeout.ping(t)
+                self.ping()
 
     def lock(self):
         return 1
@@ -272,7 +322,7 @@
             new_conn = ServerConnection(self.getEventManager(), handler,
                 connector=new_s, addr=addr)
             # A request for a node identification should arrive.
-            self._timeout.update(time(), timeout=INCOMING_TIMEOUT)
+            self._timeout.update(time())
             handler.connectionAccepted(new_conn)
         except ConnectorTryAgainException:
             pass
@@ -493,10 +543,11 @@
         msg_id = self._getNextId()
         packet.setId(msg_id)
         self._addPacket(packet)
+        t = time()
         # If there is no pending request, initialise timeout values.
         if not self._handlers.isPending():
-            self._timeout.update(time(), timeout=timeout)
-        self._handlers.emit(packet)
+            self._timeout.update(t)
+        self._handlers.emit(packet, t + timeout)
         return msg_id
 
     @not_closed
@@ -615,10 +666,11 @@
             packet.setId(msg_id)
             self.dispatcher.register(self, msg_id, self._local_var.queue)
             self._addPacket(packet)
+            t = time()
             # If there is no pending request, initialise timeout values.
             if not self._handlers.isPending():
-                self._timeout.update(time(), timeout=timeout)
-            self._handlers.emit(packet)
+                self._timeout.update(t)
+            self._handlers.emit(packet, t + timeout)
             return msg_id
         finally:
             self.unlock()

Modified: trunk/neo/tests/testConnection.py
==============================================================================
--- trunk/neo/tests/testConnection.py [iso-8859-1] (original)
+++ trunk/neo/tests/testConnection.py [iso-8859-1] Thu Apr 22 17:55:09 2010
@@ -19,7 +19,7 @@
 from mock import Mock
 from neo.connection import ListeningConnection, Connection, \
      ClientConnection, ServerConnection, MTClientConnection, \
-     HandlerSwitcher, Timeout
+     HandlerSwitcher, Timeout, PING_DELAY, PING_TIMEOUT
 from neo.connector import getConnectorHandler, registerConnectorHandler
 from neo.tests import DoNothingConnector
 from neo.connector import ConnectorException, ConnectorTryAgainException, \
@@ -808,7 +808,7 @@
     def testEmit(self):
         self.assertFalse(self._handlers.isPending())
         request = self._makeRequest(1)
-        self._handlers.emit(request)
+        self._handlers.emit(request, 0)
         self.assertTrue(self._handlers.isPending())
 
     def testHandleNotification(self):
@@ -818,7 +818,7 @@
         self._checkPacketReceived(self._handler, notif1)
         # emit a request and delay an handler
         request = self._makeRequest(2)
-        self._handlers.emit(request)
+        self._handlers.emit(request, 0)
         handler = self._makeHandler()
         self._handlers.setHandler(handler)
         # next notification fall into the current handler
@@ -835,7 +835,7 @@
     def testHandleAnswer1(self):
         # handle with current handler
         request = self._makeRequest(1)
-        self._handlers.emit(request)
+        self._handlers.emit(request, 0)
         answer = self._makeAnswer(1)
         self._handlers.handle(answer)
         self._checkPacketReceived(self._handler, answer)
@@ -843,7 +843,7 @@
     def testHandleAnswer2(self):
         # handle with blocking handler
         request = self._makeRequest(1)
-        self._handlers.emit(request)
+        self._handlers.emit(request, 0)
         handler = self._makeHandler()
         self._handlers.setHandler(handler)
         answer = self._makeAnswer(1)
@@ -863,11 +863,11 @@
         h2 = self._makeHandler()
         h3 = self._makeHandler()
         # emit all requests and setHandleres
-        self._handlers.emit(r1)
+        self._handlers.emit(r1, 0)
         self._handlers.setHandler(h1)
-        self._handlers.emit(r2)
+        self._handlers.emit(r2, 0)
         self._handlers.setHandler(h2)
-        self._handlers.emit(r3)
+        self._handlers.emit(r3, 0)
         self._handlers.setHandler(h3)
         self._checkCurrentHandler(self._handler)
         self.assertTrue(self._handlers.isPending())
@@ -889,9 +889,9 @@
         a3 = self._makeAnswer(3)
         h = self._makeHandler()
         # emit all requests
-        self._handlers.emit(r1)
-        self._handlers.emit(r2)
-        self._handlers.emit(r3)
+        self._handlers.emit(r1, 0)
+        self._handlers.emit(r2, 0)
+        self._handlers.emit(r3, 0)
         self._handlers.setHandler(h)
         # process answers
         self._handlers.handle(a1)
@@ -908,59 +908,98 @@
         a2 = self._makeAnswer(2)
         h = self._makeHandler()
         # emit requests aroung state setHandler
-        self._handlers.emit(r1)
+        self._handlers.emit(r1, 0)
         self._handlers.setHandler(h)
-        self._handlers.emit(r2)
+        self._handlers.emit(r2, 0)
         # process answer for next state
         self._handlers.handle(a2)
         self.checkAborted(self._connection)
 
+    def testTimeout(self):
+        """
+          This timeout happens when a request has not been answered for longer
+          than a duration defined at emit() time.
+        """
+        now = time()
+        # No timeout when no pending request
+        self.assertEqual(self._handlers.checkTimeout(now), None)
+        # Prepare some requests
+        msg_id_1 = 1
+        msg_id_2 = 2
+        msg_id_3 = 3
+        r1 = self._makeRequest(msg_id_1)
+        a1 = self._makeAnswer(msg_id_1)
+        r2 = self._makeRequest(msg_id_2)
+        r3 = self._makeRequest(msg_id_3)
+        msg_1_time = now + 5
+        msg_2_time = msg_1_time + 5
+        msg_3_time = msg_2_time + 5
+        # Emit r3 before all other, to test that it's time parameter value
+        # which is used, not the registration order.
+        self._handlers.emit(r3, msg_3_time)
+        self._handlers.emit(r1, msg_1_time)
+        self._handlers.emit(r2, msg_2_time)
+        # No timeout before msg_1_time
+        self.assertEqual(self._handlers.checkTimeout(now), None)
+        # Timeout for msg_1 after msg_1_time
+        self.assertEqual(self._handlers.checkTimeout(msg_1_time + 0.5),
+            msg_id_1)
+        # If msg_1 met its answer, no timeout after msg_1_time
+        self._handlers.handle(a1)
+        self.assertEqual(self._handlers.checkTimeout(msg_1_time + 0.5), None)
+        # Next timeout is after msg_2_time
+        self.assertEqual(self._handlers.checkTimeout(msg_2_time + 0.5), msg_id_2)
 
 class TestTimeout(NeoTestBase):
-    """ assume PING_DELAY=5 """
-
     def setUp(self):
-        self.initial = time()
-        self.current = self.initial
+        self.current = time()
         self.timeout = Timeout()
-
-    def checkAfter(self, n, soft, hard):
+        self.timeout.update(self.current)
+
+    def _checkAt(self, n, soft, hard):
         at = self.current + n
         self.assertEqual(soft, self.timeout.softExpired(at))
         self.assertEqual(hard, self.timeout.hardExpired(at))
 
-    def refreshAfter(self, n):
-        self.current += n
-        self.timeout.refresh(self.current)
-
-    def testNoTimeout(self):
-        self.timeout.update(self.initial, 5)
-        self.checkAfter(1, False, False)
-        self.checkAfter(4, False, False)
-        self.refreshAfter(4) # answer received
-        self.checkAfter(1, False, False)
+    def _refreshAt(self, n):
+        self.timeout.refresh(self.current + n)
+
+    def _pingAt(self, n):
+        self.timeout.ping(self.current + n)
 
     def testSoftTimeout(self):
-        self.timeout.update(self.initial, 5)
-        self.checkAfter(1, False, False)
-        self.checkAfter(4, False, False)
-        self.checkAfter(6, True, True) # ping
-        self.refreshAfter(8) # pong
-        self.checkAfter(1, False, False)
-        self.checkAfter(4, False, True)
+        """
+          Soft timeout is when a ping should be sent to peer to see if it's
+          still responsive, after seing no life sign for PING_DELAY.
+        """
+        # Before PING_DELAY, no timeout.
+        self._checkAt(PING_DELAY - 0.5, False, False)
+        # If nothing came to refresh the timeout, soft timeout will be asserted
+        # after PING_DELAY.
+        self._checkAt(PING_DELAY + 0.5, True, False)
+        # If something refreshes the timeout, soft timeout will not be asserted
+        # after PING_DELAY.
+        answer_time = PING_DELAY - 0.5
+        self._refreshAt(answer_time)
+        self._checkAt(PING_DELAY + 0.5, False, False)
+        # ...but it will happen again after PING_DELAY after that answer
+        self._checkAt(answer_time + PING_DELAY + 0.5, True, False)
 
     def testHardTimeout(self):
-        self.timeout.update(self.initial, 5)
-        self.checkAfter(1, False, False)
-        self.checkAfter(4, False, False)
-        self.checkAfter(6, True, True) # ping
-        self.refreshAfter(6) # pong
-        self.checkAfter(1, False, False)
-        self.checkAfter(4, False, False)
-        self.checkAfter(6, False, True) # ping
-        self.refreshAfter(6) # pong
-        self.checkAfter(1, False, True) # too late
-        self.checkAfter(5, False, True)
+        """
+          Hard timeout is when a ping was sent, and any life sign must come
+          back to us before PING_TIMEOUT.
+        """
+        # A timeout triggered at PING_DELAY, so a ping was sent.
+        self._pingAt(PING_DELAY)
+        # Before PING_DELAY + PING_TIMEOUT, no timeout occurs.
+        self._checkAt(PING_DELAY + PING_TIMEOUT - 0.5, False, False)
+        # After PING_DELAY + PING_TIMEOUT, hard timeout occurs.
+        self._checkAt(PING_DELAY + PING_TIMEOUT + 0.5, False, True)
+        # If anything happened on the connection, there is no hard timeout
+        # anymore after PING_DELAY + PING_TIMEOUT.
+        self._refreshAt(PING_DELAY + PING_TIMEOUT - 0.5)
+        self._checkAt(PING_DELAY + PING_TIMEOUT + 0.5, False, False)
 
 if __name__ == '__main__':
     unittest.main()





More information about the Neo-report mailing list