[Neo-report] r2762 jm - in /trunk: ./ neo/client/handlers/ neo/lib/ neo/tests/

nobody at svn.erp5.org nobody at svn.erp5.org
Fri May 27 18:53:16 CEST 2011


Author: jm
Date: Fri May 27 18:53:16 2011
New Revision: 2762

Log:
connection: reimplement timeout logic and redefine pings as a keep-alive feature

- Previous implementation was not able to import transactions with many small
  objects, the client for faster to send a store request than to process its
  answer. If X is the difference of time for these 2 operations, the maximum
  number of objects a transaction could contain was CRITICAL_TIMEOUT / X.
  And HasLock feature can't act as a workaround because it is not working yet.
- Change API of 'on_timeout', which currently only used by HasLock.
- Stop pinging when we wait for an answer. This wastes resources and would
  never recover any bad state.
- Make client connections send pings when they are idle instead.
  This implements keep-alive feature for high availability.
  Start with an non-configurable period of 60 seconds.
- Move processing of ping/pong to handlers.

Modified:
    trunk/TODO
    trunk/neo/client/handlers/__init__.py
    trunk/neo/lib/connection.py
    trunk/neo/lib/handler.py
    trunk/neo/tests/testConnection.py

Modified: trunk/TODO
==============================================================================
--- trunk/TODO [iso-8859-1] (original)
+++ trunk/TODO [iso-8859-1] Fri May 27 18:53:16 2011
@@ -20,7 +20,7 @@ RC  - Clarify cell state signification
 RC  - Review XXX in the code (CODE)
 RC  - Review TODO in the code (CODE)
 RC  - Review output of pylint (CODE)
-    - Keep-alive (HIGH AVAILABILITY)
+    - Keep-alive (HIGH AVAILABILITY) (implemented, to be reviewed and tested)
       Consider the need to implement a keep-alive system (packets sent
       automatically when there is no activity on the connection for a period
       of time).

Modified: trunk/neo/client/handlers/__init__.py
==============================================================================
--- trunk/neo/client/handlers/__init__.py [iso-8859-1] (original)
+++ trunk/neo/client/handlers/__init__.py [iso-8859-1] Fri May 27 18:53:16 2011
@@ -16,7 +16,7 @@
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 
 from neo.lib.handler import EventHandler
-from neo.lib.protocol import ProtocolError
+from neo.lib.protocol import ProtocolError, Packets
 
 class BaseHandler(EventHandler):
     """Base class for client-side EventHandler implementations."""
@@ -37,10 +37,10 @@ class BaseHandler(EventHandler):
 
     def packetReceived(self, conn, packet):
         """Redirect all received packet to dispatcher thread."""
-        if packet.isResponse():
+        if packet.isResponse() and type(packet) is not Packets.Pong:
             if not self.dispatcher.dispatch(conn, packet.getId(), packet):
-                raise ProtocolError('Unexpected response packet from %r: %r',
-                        conn, packet)
+                raise ProtocolError('Unexpected response packet from %r: %r'
+                                    % (conn, packet))
         else:
             self.dispatch(conn, packet)
 

Modified: trunk/neo/lib/connection.py
==============================================================================
--- trunk/neo/lib/connection.py [iso-8859-1] (original)
+++ trunk/neo/lib/connection.py [iso-8859-1] Fri May 27 18:53:16 2011
@@ -32,9 +32,7 @@ from neo.lib import attributeTracker
 from neo.lib.util import ReadBuffer
 from neo.lib.profiling import profiler_decorator
 
-PING_DELAY = 6
-PING_TIMEOUT = 5
-INCOMING_TIMEOUT = 10
+KEEP_ALIVE = 60
 CRITICAL_TIMEOUT = 30
 
 class ConnectionClosed(Exception):
@@ -130,26 +128,19 @@ class HandlerSwitcher(object):
             self._next_on_timeout = on_timeout
         request_dict[msg_id] = (answer_class, timeout, on_timeout)
 
-    def checkTimeout(self, connection, t):
-        next_timeout = self._next_timeout
-        if next_timeout is not None and next_timeout < t:
-            msg_id = self._next_timeout_msg_id
-            if self._next_on_timeout is None:
-                result = msg_id
-            else:
-                if self._next_on_timeout(connection, msg_id):
-                    # Don't notify that a timeout occured, and forget about
-                    # this answer.
-                    for (request_dict, _) in self._pending:
-                        request_dict.pop(msg_id, None)
-                    self._updateNextTimeout()
-                    result = None
-                else:
-                    # Notify that a timeout occured
-                    result = msg_id
-        else:
-            result = None
-        return result
+    def getNextTimeout(self):
+        return self._next_timeout
+
+    def timeout(self, connection):
+        msg_id = self._next_timeout_msg_id
+        if self._next_on_timeout is not None:
+            self._next_on_timeout(connection, msg_id)
+            if self._next_timeout_msg_id != msg_id:
+                # on_timeout sent a packet with a smaller timeout
+                # so keep the connection open
+                return
+        # Notify that a timeout occured
+        return msg_id
 
     def handle(self, connection, packet):
         assert not self._is_handling
@@ -191,24 +182,18 @@ class HandlerSwitcher(object):
             neo.lib.logging.debug(
                             'Apply handler %r on %r', self._pending[0][1],
                     connection)
-        if timeout == self._next_timeout:
+        if msg_id == self._next_timeout_msg_id:
             self._updateNextTimeout()
 
     def _updateNextTimeout(self):
         # Find next timeout and its msg_id
-        timeout_list = []
-        extend = timeout_list.extend
-        for (request_dict, handler) in self._pending:
-            extend(((timeout, msg_id, on_timeout) \
-                for msg_id, (_, timeout, on_timeout) in \
-                request_dict.iteritems()))
-        if timeout_list:
-            timeout_list.sort(key=lambda x: x[0])
-            self._next_timeout, self._next_timeout_msg_id, \
-                self._next_on_timeout = timeout_list[0]
-        else:
-            self._next_timeout, self._next_timeout_msg_id, \
-                self._next_on_timeout = None, None, None
+        next_timeout = None
+        for pending in self._pending:
+            for msg_id, (_, timeout, on_timeout) in pending[0].iteritems():
+                if not next_timeout or timeout < next_timeout[0]:
+                    next_timeout = timeout, msg_id, on_timeout
+        self._next_timeout, self._next_timeout_msg_id, self._next_on_timeout = \
+            next_timeout or (None, None, None)
 
     @profiler_decorator
     def setHandler(self, handler):
@@ -222,53 +207,28 @@ class HandlerSwitcher(object):
         return can_apply
 
 
-class Timeout(object):
-    """ Keep track of connection-level timeouts """
-
-    def __init__(self):
-        self._ping_time = None
-        self._critical_time = None
-
-    def update(self, t, force=False):
-        """
-        Send occurred:
-        - set ping time if earlier than existing one
-        """
-        ping_time = self._ping_time
-        t += PING_DELAY
-        if force or ping_time is None or t < ping_time:
-            self._ping_time = t
-
-    def refresh(self, t):
-        """
-        Recv occured:
-        - reschedule next ping time
-        - as this is an evidence that node is alive, remove pong expectation
-        """
-        self._ping_time = t + PING_DELAY
-        self._critical_time = None
-
-    def ping(self, t):
-        """
-        Ping send occured:
-        - reschedule next ping time
-        - set pong expectation
-        """
-        self._ping_time = t + PING_DELAY
-        self._critical_time = t + PING_TIMEOUT
+class BaseConnection(object):
+    """A base connection
 
-    def softExpired(self, t):
-        """ Do we need to ping ? """
-        return self._ping_time < t
-
-    def hardExpired(self, t):
-        """ Have we reached pong latest arrival time, if set ? """
-        critical_time = self._critical_time
-        return critical_time is not None and critical_time < t
+    About timeouts:
 
+        Timeout are mainly per-connection instead of per-packet.
+        The idea is that most of time, packets are received and processed
+        sequentially, so if it takes a long for a peer to process a packet,
+        following packets would just be enqueued.
+        What really matters is that the peer makes progress in its work.
+        As long as we receive an answer, we consider it's still alive and
+        it may just have started to process the following request. So we reset
+        timeouts.
+        There is anyway nothing more we could do, because processing of a packet
+        may be delayed in a very unpredictable way depending of previously
+        received packets on peer side.
+        Even ourself may be slow to receive a packet. We must not timeout for
+        an answer that is already in our incoming buffer (read_buf or _queue).
+        Timeouts in HandlerSwitcher are only there to prioritize some packets.
+    """
 
-class BaseConnection(object):
-    """A base connection."""
+    _base_timeout = None
 
     def __init__(self, event_manager, handler, connector, addr=None):
         assert connector is not None, "Need a low-level connector"
@@ -276,28 +236,33 @@ class BaseConnection(object):
         self.connector = connector
         self.addr = addr
         self._handlers = HandlerSwitcher(handler)
-        self._timeout = Timeout()
         event_manager.register(self)
 
     def isPending(self):
         return self._handlers.isPending()
 
+    def updateTimeout(self, t=None):
+        if not self._queue:
+            if t:
+                self._base_timeout = t
+            self._timeout = self._handlers.getNextTimeout() or KEEP_ALIVE
+
     def checkTimeout(self, t):
-        handlers = self._handlers
-        if handlers.isPending():
-            msg_id = handlers.checkTimeout(self, t)
-            if msg_id is not None:
-                neo.lib.logging.info(
-                                'timeout for #0x%08x with %r', msg_id, self)
-                self.close()
-            elif self._timeout.hardExpired(t):
-                # critical time reach or pong not received, abort
-                neo.lib.logging.info('timeout with %r', self)
-                self.notify(Packets.Notify('Timeout'))
-                self.abort()
-            elif self._timeout.softExpired(t):
-                self._timeout.ping(t)
-                self.ping()
+        # first make sure we don't timeout on answers we already received
+        if self._base_timeout and not self._queue:
+            timeout = t - self._base_timeout
+            if self._timeout <= timeout:
+                handlers = self._handlers
+                if handlers.isPending():
+                    msg_id = handlers.timeout(self)
+                    if msg_id is None:
+                        self._base_timeout = t
+                    else:
+                        neo.lib.logging.info('timeout for #0x%08x with %r',
+                                             msg_id, self)
+                        self.close()
+                else:
+                    self.idle()
 
     def lock(self):
         return 1
@@ -381,6 +346,10 @@ class BaseConnection(object):
         """
         return attributeTracker.whoSet(self, 'connector')
 
+    def idle(self):
+        pass
+
+
 attributeTracker.track(BaseConnection)
 
 class ListeningConnection(BaseConnection):
@@ -400,8 +369,6 @@ class ListeningConnection(BaseConnection
             handler = self.getHandler()
             new_conn = ServerConnection(self.getEventManager(), handler,
                 connector=new_s, addr=addr)
-            # A request for a node identification should arrive.
-            self._timeout.update(time())
             handler.connectionAccepted(new_conn)
         except ConnectorTryAgainException:
             pass
@@ -421,9 +388,8 @@ class Connection(BaseConnection):
 
     connecting = False
 
-    def __init__(self, event_manager, handler, connector, addr=None):
-        BaseConnection.__init__(self, event_manager, handler,
-                                connector=connector, addr=addr)
+    def __init__(self, event_manager, *args, **kw):
+        BaseConnection.__init__(self, event_manager, *args, **kw)
         self.read_buf = ReadBuffer()
         self.write_buf = []
         self.cur_id = 0
@@ -493,19 +459,7 @@ class Connection(BaseConnection):
             except PacketMalformedError, msg:
                 self.getHandler()._packetMalformed(self, msg)
                 return
-            self._timeout.refresh(time())
-            packet_type = type(packet)
-            if packet_type is Packets.Ping:
-                # Send a pong notification
-                PACKET_LOGGER.dispatch(self, packet, False)
-                if not self.aborted:
-                    self.answer(Packets.Pong(), packet.getId())
-            elif packet_type is Packets.Pong:
-                # Skip PONG packets, its only purpose is refresh the timeout
-                # generated upong ping. But still log them.
-                PACKET_LOGGER.dispatch(self, packet, False)
-            else:
-                self._queue.append(packet)
+            self._queue.append(packet)
 
     def hasPendingMessages(self):
         """
@@ -520,6 +474,7 @@ class Connection(BaseConnection):
         # check out packet and process it with current handler
         packet = self._queue.pop(0)
         self._handlers.handle(self, packet)
+        self.updateTimeout()
 
     def pending(self):
         return self.connector is not None and self.write_buf
@@ -580,6 +535,7 @@ class Connection(BaseConnection):
                     'Connection %r closed in recv', self.connector)
                 self._closure()
                 return
+            self._base_timeout = time() # last known remote activity
             self.read_buf.append(data)
 
     @profiler_decorator
@@ -646,11 +602,10 @@ class Connection(BaseConnection):
         msg_id = self._getNextId()
         packet.setId(msg_id)
         self._addPacket(packet)
-        t = time()
-        # If there is no pending request, initialise timeout values.
-        if not self._handlers.isPending():
-            self._timeout.update(t, force=True)
-        self._handlers.emit(packet, t + timeout, on_timeout)
+        handlers = self._handlers
+        t = not handlers.isPending() and time() or None
+        handlers.emit(packet, timeout, on_timeout)
+        self.updateTimeout(t)
         return msg_id
 
     @not_closed
@@ -662,21 +617,14 @@ class Connection(BaseConnection):
         assert packet.isResponse(), packet
         self._addPacket(packet)
 
-    @not_closed
-    def ping(self):
-        packet = Packets.Ping()
-        packet.setId(self._getNextId())
-        self._addPacket(packet)
-
 
 class ClientConnection(Connection):
     """A connection from this node to a remote node."""
 
     connecting = True
 
-    def __init__(self, event_manager, handler, addr, connector, **kw):
-        Connection.__init__(self, event_manager, handler, addr=addr,
-                            connector=connector)
+    def __init__(self, event_manager, handler, addr, connector):
+        Connection.__init__(self, event_manager, handler, connector, addr)
         handler.connectionStarted(self)
         try:
             try:
@@ -685,6 +633,7 @@ class ClientConnection(Connection):
                 event_manager.addWriter(self)
             else:
                 self.connecting = False
+                self.updateTimeout(time())
                 self.getHandler().connectionCompleted(self)
         except ConnectorConnectionRefusedException:
             self._closure()
@@ -702,6 +651,7 @@ class ClientConnection(Connection):
                 return
             else:
                 self.connecting = False
+                self.updateTimeout(time())
                 self.getHandler().connectionCompleted(self)
                 self.em.addReader(self)
         else:
@@ -710,10 +660,17 @@ class ClientConnection(Connection):
     def isClient(self):
         return True
 
+    def idle(self):
+        self.ask(Packets.Ping())
+
 
 class ServerConnection(Connection):
     """A connection from a remote node to this node."""
 
+    def __init__(self, *args, **kw):
+        Connection.__init__(self, *args, **kw)
+        self.updateTimeout(time())
+
     def isServer(self):
         return True
 
@@ -778,11 +735,10 @@ class MTClientConnection(ClientConnectio
             else:
                 self.dispatcher.register(self, msg_id, queue)
             self._addPacket(packet)
-            t = time()
-            # If there is no pending request, initialise timeout values.
-            if not self._handlers.isPending():
-                self._timeout.update(t)
-            self._handlers.emit(packet, t + timeout, on_timeout)
+            handlers = self._handlers
+            t = not handlers.isPending() and time() or None
+            handlers.emit(packet, timeout, on_timeout)
+            self.updateTimeout(t)
             return msg_id
         finally:
             self.unlock()

Modified: trunk/neo/lib/handler.py
==============================================================================
--- trunk/neo/lib/handler.py [iso-8859-1] (original)
+++ trunk/neo/lib/handler.py [iso-8859-1] Fri May 27 18:53:16 2011
@@ -121,6 +121,15 @@ class EventHandler(object):
 
     # Packet handlers.
 
+    def ping(self, conn):
+        if not conn.isAborted():
+            conn.answer(Packets.Pong())
+
+    def pong(self, conn):
+        # Ignore PONG packets. The only purpose of ping/pong packets is
+        # to test/maintain underlying connection.
+        pass
+
     def notify(self, conn, message):
         neo.lib.logging.info('notification from %r: %s', conn, message)
 

Modified: trunk/neo/tests/testConnection.py
==============================================================================
--- trunk/neo/tests/testConnection.py [iso-8859-1] (original)
+++ trunk/neo/tests/testConnection.py [iso-8859-1] Fri May 27 18:53:16 2011
@@ -1,3 +1,4 @@
+# -*- coding: utf-8 -*-
 #
 # Copyright (C) 2009-2010  Nexedi SA
 #
@@ -19,11 +20,12 @@ from time import time
 from mock import Mock
 from neo.lib.connection import ListeningConnection, Connection, \
      ClientConnection, ServerConnection, MTClientConnection, \
-     HandlerSwitcher, Timeout, PING_DELAY, PING_TIMEOUT, OnTimeout
+     HandlerSwitcher, CRITICAL_TIMEOUT
 from neo.lib.connector import getConnectorHandler, registerConnectorHandler
 from neo.tests import DoNothingConnector
 from neo.lib.connector import ConnectorException, ConnectorTryAgainException, \
      ConnectorInProgressException, ConnectorConnectionRefusedException
+from neo.lib.handler import EventHandler
 from neo.lib.protocol import Packets, ParserState
 from neo.tests import NeoUnitTestBase
 from neo.lib.util import ReadBuffer
@@ -502,40 +504,6 @@ class ConnectionTests(NeoUnitTestBase):
         self.assertEqual(data.decode(), p.decode())
         self._checkReadBuf(bc, '')
 
-    def test_Connection_analyse5(self):
-        # test ping handling
-        bc = self._makeConnection()
-        bc._queue = Mock()
-        p = Packets.Ping()
-        p.setId(1)
-        self._appendPacketToReadBuf(bc, p)
-        bc.analyse()
-        # check no packet was queued
-        self.assertEqual(len(bc._queue.mockGetNamedCalls("append")), 0)
-        # check pong answered
-        parser_state = ParserState()
-        buffer = ReadBuffer()
-        for chunk in bc.write_buf:
-            buffer.append(chunk)
-        answer = Packets.parse(buffer, parser_state)
-        self.assertTrue(answer is not None)
-        self.assertTrue(type(answer) == Packets.Pong)
-        self.assertEqual(answer.getId(), p.getId())
-
-    def test_Connection_analyse6(self):
-        # test pong handling
-        bc = self._makeConnection()
-        bc._timeout = Mock()
-        bc._queue = Mock()
-        p = Packets.Pong()
-        p.setId(1)
-        self._appendPacketToReadBuf(bc, p)
-        bc.analyse()
-        # check no packet was queued
-        self.assertEqual(len(bc._queue.mockGetNamedCalls("append")), 0)
-        # check timeout has been refreshed
-        self.assertEqual(len(bc._timeout.mockGetNamedCalls("refresh")), 1)
-
     def test_Connection_writable1(self):
         # with  pending operation after send
         def send(self, data):
@@ -800,6 +768,68 @@ class ConnectionTests(NeoUnitTestBase):
         next_id = bc._getNextId()
         self.assertEqual(next_id, 0)
 
+    def test_15_Timeout(self):
+        # NOTE: This method uses ping/pong packets only because MT connection
+        #       don't accept any other packet without specifying a queue.
+        self.handler = EventHandler(self.app)
+        conn = self._makeClientConnection()
+
+        use_case_list = (
+            # (a) For a single packet sent at T,
+            #     the limit time for the answer is T + (1 * CRITICAL_TIMEOUT)
+            ((), (1., 0)),
+            # (b) Same as (a), even if send another packet at (T + CT/2).
+            #     But receiving a packet (at T + CT - ε) resets the timeout
+            #     (which means the limit for the 2nd one is T + 2*CT)
+            ((.5, None), (1., 0, 2., 1)),
+            # (c) Same as (b) with a first answer at well before the limit
+            #     (T' = T + CT/2). The limit for the second one is T' + CT.
+            ((.1, None, .5, 1), (1.5, 0)),
+        )
+
+        from neo.lib import connection
+        def set_time(t):
+            connection.time = lambda: int(CRITICAL_TIMEOUT * (1000 + t))
+        closed = []
+        conn.close = lambda: closed.append(connection.time())
+        def answer(packet_id):
+            p = Packets.Pong()
+            p.setId(packet_id)
+            conn.connector.receive = [''.join(p.encode())].pop
+            conn.readable()
+            conn.checkTimeout(connection.time())
+            conn.process()
+        try:
+            for use_case, expected in use_case_list:
+                i = iter(use_case)
+                conn.cur_id = 0
+                set_time(0)
+                # No timeout when no pending request
+                self.assertEqual(conn._handlers.getNextTimeout(), None)
+                conn.ask(Packets.Ping())
+                for t in i:
+                    set_time(t)
+                    conn.checkTimeout(connection.time())
+                    packet_id = i.next()
+                    if packet_id is None:
+                        conn.ask(Packets.Ping())
+                    else:
+                        answer(packet_id)
+                i = iter(expected)
+                for t in i:
+                    set_time(t - .1)
+                    conn.checkTimeout(connection.time())
+                    set_time(t)
+                    # this test method relies on the fact that only
+                    # conn.close is called in case of a timeout
+                    conn.checkTimeout(connection.time())
+                    self.assertEqual(closed.pop(), connection.time())
+                    answer(i.next())
+                self.assertFalse(conn.isPending())
+                self.assertFalse(closed)
+        finally:
+            connection.time = time
+
 class MTConnectionTests(ConnectionTests):
     # XXX: here we test non-client-connection-related things too, which
     # duplicates test suite work... Should be fragmented into finer-grained
@@ -1005,142 +1035,6 @@ class HandlerSwitcherTests(NeoUnitTestBa
         self._handlers.handle(self._connection, a2)
         self.checkAborted(self._connection)
 
-    def testTimeout(self):
-        """
-          This timeout happens when a request has not been answered for longer
-          than a duration defined at emit() time.
-        """
-        now = time()
-        # No timeout when no pending request
-        self.assertEqual(self._handlers.checkTimeout(self._connection, now),
-            None)
-        # Prepare some requests
-        msg_id_1 = 1
-        msg_id_2 = 2
-        msg_id_3 = 3
-        msg_id_4 = 4
-        r1 = self._makeRequest(msg_id_1)
-        a1 = self._makeAnswer(msg_id_1)
-        r2 = self._makeRequest(msg_id_2)
-        a2 = self._makeAnswer(msg_id_2)
-        r3 = self._makeRequest(msg_id_3)
-        r4 = self._makeRequest(msg_id_4)
-        msg_1_time = now + 5
-        msg_2_time = msg_1_time + 5
-        msg_3_time = msg_2_time + 5
-        msg_4_time = msg_3_time + 5
-        markers = []
-        def msg_3_on_timeout(conn, msg_id):
-            markers.append((3, conn, msg_id))
-            return True
-        def msg_4_on_timeout(conn, msg_id):
-            markers.append((4, conn, msg_id))
-            return False
-        # Emit r3 before all other, to test that it's time parameter value
-        # which is used, not the registration order.
-        self._handlers.emit(r3, msg_3_time, OnTimeout(msg_3_on_timeout))
-        self._handlers.emit(r1, msg_1_time, None)
-        self._handlers.emit(r2, msg_2_time, None)
-        # No timeout before msg_1_time
-        self.assertEqual(self._handlers.checkTimeout(self._connection, now),
-            None)
-        # Timeout for msg_1 after msg_1_time
-        self.assertEqual(self._handlers.checkTimeout(self._connection,
-            msg_1_time + 0.5), msg_id_1)
-        # If msg_1 met its answer, no timeout after msg_1_time
-        self._handlers.handle(self._connection, a1)
-        self.assertEqual(self._handlers.checkTimeout(self._connection,
-            msg_1_time + 0.5), None)
-        # Next timeout is after msg_2_time
-        self.assertEqual(self._handlers.checkTimeout(self._connection,
-            msg_2_time + 0.5), msg_id_2)
-        self._handlers.handle(self._connection, a2)
-        # Sanity check
-        self.assertEqual(self._handlers.checkTimeout(self._connection,
-            msg_2_time + 0.5), None)
-        # msg_3 timeout will fire msg_3_on_timeout callback, which causes the
-        # timeout to be ignored (it returns True)
-        self.assertEqual(self._handlers.checkTimeout(self._connection,
-            msg_3_time + 0.5), None)
-        # ...check that callback actually fired
-        self.assertEqual(len(markers), 1)
-        # ...with expected parameters
-        self.assertEqual(markers[0], (3, self._connection, msg_id_3))
-        # answer to msg_3 must not be expected anymore (and it was the last
-        # expected message)
-        self.assertFalse(self._handlers.isPending())
-        del markers[:]
-        self._handlers.emit(r4, msg_4_time, OnTimeout(msg_4_on_timeout))
-        # msg_4 timeout will fire msg_4_on_timeout callback, which lets the
-        # timeout be detected (it returns False)
-        self.assertEqual(self._handlers.checkTimeout(self._connection,
-            msg_4_time + 0.5), msg_id_4)
-        # ...check that callback actually fired
-        self.assertEqual(len(markers), 1)
-        # ...with expected parameters
-        self.assertEqual(markers[0], (4, self._connection, msg_id_4))
-
-class TestTimeout(NeoUnitTestBase):
-    def setUp(self):
-        NeoUnitTestBase.setUp(self)
-        self.current = time()
-        self.timeout = Timeout()
-        self._updateAt(0)
-        self.assertTrue(PING_DELAY > PING_TIMEOUT) # Sanity check
-
-    def _checkAt(self, n, soft, hard):
-        at = self.current + n
-        self.assertEqual(soft, self.timeout.softExpired(at))
-        self.assertEqual(hard, self.timeout.hardExpired(at))
-
-    def _updateAt(self, n, force=False):
-        self.timeout.update(self.current + n, force=force)
-
-    def _refreshAt(self, n):
-        self.timeout.refresh(self.current + n)
-
-    def _pingAt(self, n):
-        self.timeout.ping(self.current + n)
-
-    def testSoftTimeout(self):
-        """
-          Soft timeout is when a ping should be sent to peer to see if it's
-          still responsive, after seing no life sign for PING_DELAY.
-        """
-        # Before PING_DELAY, no timeout.
-        self._checkAt(PING_DELAY - 0.5, False, False)
-        # If nothing came to refresh the timeout, soft timeout will be asserted
-        # after PING_DELAY.
-        self._checkAt(PING_DELAY + 0.5, True, False)
-        # If something refreshes the timeout, soft timeout will not be asserted
-        # after PING_DELAY.
-        answer_time = PING_DELAY - 0.5
-        self._refreshAt(answer_time)
-        self._checkAt(PING_DELAY + 0.5, False, False)
-        # ...but it will happen again after PING_DELAY after that answer
-        self._checkAt(answer_time + PING_DELAY + 0.5, True, False)
-        # if there is no more pending requests, a clear will happen so next
-        # send doesn't immediately trigger a ping
-        new_request_time = answer_time + PING_DELAY * 2
-        self._updateAt(new_request_time, force=True)
-        self._checkAt(new_request_time + PING_DELAY - 0.5, False, False)
-        self._checkAt(new_request_time + PING_DELAY + 0.5, True, False)
-
-    def testHardTimeout(self):
-        """
-          Hard timeout is when a ping was sent, and any life sign must come
-          back to us before PING_TIMEOUT.
-        """
-        # A timeout triggered at PING_DELAY, so a ping was sent.
-        self._pingAt(PING_DELAY)
-        # Before PING_DELAY + PING_TIMEOUT, no timeout occurs.
-        self._checkAt(PING_DELAY + PING_TIMEOUT - 0.5, False, False)
-        # After PING_DELAY + PING_TIMEOUT, hard timeout occurs.
-        self._checkAt(PING_DELAY + PING_TIMEOUT + 0.5, False, True)
-        # If anything happened on the connection, there is no hard timeout
-        # anymore after PING_DELAY + PING_TIMEOUT.
-        self._refreshAt(PING_DELAY + PING_TIMEOUT - 0.5)
-        self._checkAt(PING_DELAY + PING_TIMEOUT + 0.5, False, False)
 
 if __name__ == '__main__':
     unittest.main()




More information about the Neo-report mailing list