[Neo-report] r2762 jm - in /trunk: ./ neo/client/handlers/ neo/lib/ neo/tests/
nobody at svn.erp5.org
nobody at svn.erp5.org
Fri May 27 18:53:16 CEST 2011
Author: jm
Date: Fri May 27 18:53:16 2011
New Revision: 2762
Log:
connection: reimplement timeout logic and redefine pings as a keep-alive feature
- Previous implementation was not able to import transactions with many small
objects, the client for faster to send a store request than to process its
answer. If X is the difference of time for these 2 operations, the maximum
number of objects a transaction could contain was CRITICAL_TIMEOUT / X.
And HasLock feature can't act as a workaround because it is not working yet.
- Change API of 'on_timeout', which currently only used by HasLock.
- Stop pinging when we wait for an answer. This wastes resources and would
never recover any bad state.
- Make client connections send pings when they are idle instead.
This implements keep-alive feature for high availability.
Start with an non-configurable period of 60 seconds.
- Move processing of ping/pong to handlers.
Modified:
trunk/TODO
trunk/neo/client/handlers/__init__.py
trunk/neo/lib/connection.py
trunk/neo/lib/handler.py
trunk/neo/tests/testConnection.py
Modified: trunk/TODO
==============================================================================
--- trunk/TODO [iso-8859-1] (original)
+++ trunk/TODO [iso-8859-1] Fri May 27 18:53:16 2011
@@ -20,7 +20,7 @@ RC - Clarify cell state signification
RC - Review XXX in the code (CODE)
RC - Review TODO in the code (CODE)
RC - Review output of pylint (CODE)
- - Keep-alive (HIGH AVAILABILITY)
+ - Keep-alive (HIGH AVAILABILITY) (implemented, to be reviewed and tested)
Consider the need to implement a keep-alive system (packets sent
automatically when there is no activity on the connection for a period
of time).
Modified: trunk/neo/client/handlers/__init__.py
==============================================================================
--- trunk/neo/client/handlers/__init__.py [iso-8859-1] (original)
+++ trunk/neo/client/handlers/__init__.py [iso-8859-1] Fri May 27 18:53:16 2011
@@ -16,7 +16,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from neo.lib.handler import EventHandler
-from neo.lib.protocol import ProtocolError
+from neo.lib.protocol import ProtocolError, Packets
class BaseHandler(EventHandler):
"""Base class for client-side EventHandler implementations."""
@@ -37,10 +37,10 @@ class BaseHandler(EventHandler):
def packetReceived(self, conn, packet):
"""Redirect all received packet to dispatcher thread."""
- if packet.isResponse():
+ if packet.isResponse() and type(packet) is not Packets.Pong:
if not self.dispatcher.dispatch(conn, packet.getId(), packet):
- raise ProtocolError('Unexpected response packet from %r: %r',
- conn, packet)
+ raise ProtocolError('Unexpected response packet from %r: %r'
+ % (conn, packet))
else:
self.dispatch(conn, packet)
Modified: trunk/neo/lib/connection.py
==============================================================================
--- trunk/neo/lib/connection.py [iso-8859-1] (original)
+++ trunk/neo/lib/connection.py [iso-8859-1] Fri May 27 18:53:16 2011
@@ -32,9 +32,7 @@ from neo.lib import attributeTracker
from neo.lib.util import ReadBuffer
from neo.lib.profiling import profiler_decorator
-PING_DELAY = 6
-PING_TIMEOUT = 5
-INCOMING_TIMEOUT = 10
+KEEP_ALIVE = 60
CRITICAL_TIMEOUT = 30
class ConnectionClosed(Exception):
@@ -130,26 +128,19 @@ class HandlerSwitcher(object):
self._next_on_timeout = on_timeout
request_dict[msg_id] = (answer_class, timeout, on_timeout)
- def checkTimeout(self, connection, t):
- next_timeout = self._next_timeout
- if next_timeout is not None and next_timeout < t:
- msg_id = self._next_timeout_msg_id
- if self._next_on_timeout is None:
- result = msg_id
- else:
- if self._next_on_timeout(connection, msg_id):
- # Don't notify that a timeout occured, and forget about
- # this answer.
- for (request_dict, _) in self._pending:
- request_dict.pop(msg_id, None)
- self._updateNextTimeout()
- result = None
- else:
- # Notify that a timeout occured
- result = msg_id
- else:
- result = None
- return result
+ def getNextTimeout(self):
+ return self._next_timeout
+
+ def timeout(self, connection):
+ msg_id = self._next_timeout_msg_id
+ if self._next_on_timeout is not None:
+ self._next_on_timeout(connection, msg_id)
+ if self._next_timeout_msg_id != msg_id:
+ # on_timeout sent a packet with a smaller timeout
+ # so keep the connection open
+ return
+ # Notify that a timeout occured
+ return msg_id
def handle(self, connection, packet):
assert not self._is_handling
@@ -191,24 +182,18 @@ class HandlerSwitcher(object):
neo.lib.logging.debug(
'Apply handler %r on %r', self._pending[0][1],
connection)
- if timeout == self._next_timeout:
+ if msg_id == self._next_timeout_msg_id:
self._updateNextTimeout()
def _updateNextTimeout(self):
# Find next timeout and its msg_id
- timeout_list = []
- extend = timeout_list.extend
- for (request_dict, handler) in self._pending:
- extend(((timeout, msg_id, on_timeout) \
- for msg_id, (_, timeout, on_timeout) in \
- request_dict.iteritems()))
- if timeout_list:
- timeout_list.sort(key=lambda x: x[0])
- self._next_timeout, self._next_timeout_msg_id, \
- self._next_on_timeout = timeout_list[0]
- else:
- self._next_timeout, self._next_timeout_msg_id, \
- self._next_on_timeout = None, None, None
+ next_timeout = None
+ for pending in self._pending:
+ for msg_id, (_, timeout, on_timeout) in pending[0].iteritems():
+ if not next_timeout or timeout < next_timeout[0]:
+ next_timeout = timeout, msg_id, on_timeout
+ self._next_timeout, self._next_timeout_msg_id, self._next_on_timeout = \
+ next_timeout or (None, None, None)
@profiler_decorator
def setHandler(self, handler):
@@ -222,53 +207,28 @@ class HandlerSwitcher(object):
return can_apply
-class Timeout(object):
- """ Keep track of connection-level timeouts """
-
- def __init__(self):
- self._ping_time = None
- self._critical_time = None
-
- def update(self, t, force=False):
- """
- Send occurred:
- - set ping time if earlier than existing one
- """
- ping_time = self._ping_time
- t += PING_DELAY
- if force or ping_time is None or t < ping_time:
- self._ping_time = t
-
- def refresh(self, t):
- """
- Recv occured:
- - reschedule next ping time
- - as this is an evidence that node is alive, remove pong expectation
- """
- self._ping_time = t + PING_DELAY
- self._critical_time = None
-
- def ping(self, t):
- """
- Ping send occured:
- - reschedule next ping time
- - set pong expectation
- """
- self._ping_time = t + PING_DELAY
- self._critical_time = t + PING_TIMEOUT
+class BaseConnection(object):
+ """A base connection
- def softExpired(self, t):
- """ Do we need to ping ? """
- return self._ping_time < t
-
- def hardExpired(self, t):
- """ Have we reached pong latest arrival time, if set ? """
- critical_time = self._critical_time
- return critical_time is not None and critical_time < t
+ About timeouts:
+ Timeout are mainly per-connection instead of per-packet.
+ The idea is that most of time, packets are received and processed
+ sequentially, so if it takes a long for a peer to process a packet,
+ following packets would just be enqueued.
+ What really matters is that the peer makes progress in its work.
+ As long as we receive an answer, we consider it's still alive and
+ it may just have started to process the following request. So we reset
+ timeouts.
+ There is anyway nothing more we could do, because processing of a packet
+ may be delayed in a very unpredictable way depending of previously
+ received packets on peer side.
+ Even ourself may be slow to receive a packet. We must not timeout for
+ an answer that is already in our incoming buffer (read_buf or _queue).
+ Timeouts in HandlerSwitcher are only there to prioritize some packets.
+ """
-class BaseConnection(object):
- """A base connection."""
+ _base_timeout = None
def __init__(self, event_manager, handler, connector, addr=None):
assert connector is not None, "Need a low-level connector"
@@ -276,28 +236,33 @@ class BaseConnection(object):
self.connector = connector
self.addr = addr
self._handlers = HandlerSwitcher(handler)
- self._timeout = Timeout()
event_manager.register(self)
def isPending(self):
return self._handlers.isPending()
+ def updateTimeout(self, t=None):
+ if not self._queue:
+ if t:
+ self._base_timeout = t
+ self._timeout = self._handlers.getNextTimeout() or KEEP_ALIVE
+
def checkTimeout(self, t):
- handlers = self._handlers
- if handlers.isPending():
- msg_id = handlers.checkTimeout(self, t)
- if msg_id is not None:
- neo.lib.logging.info(
- 'timeout for #0x%08x with %r', msg_id, self)
- self.close()
- elif self._timeout.hardExpired(t):
- # critical time reach or pong not received, abort
- neo.lib.logging.info('timeout with %r', self)
- self.notify(Packets.Notify('Timeout'))
- self.abort()
- elif self._timeout.softExpired(t):
- self._timeout.ping(t)
- self.ping()
+ # first make sure we don't timeout on answers we already received
+ if self._base_timeout and not self._queue:
+ timeout = t - self._base_timeout
+ if self._timeout <= timeout:
+ handlers = self._handlers
+ if handlers.isPending():
+ msg_id = handlers.timeout(self)
+ if msg_id is None:
+ self._base_timeout = t
+ else:
+ neo.lib.logging.info('timeout for #0x%08x with %r',
+ msg_id, self)
+ self.close()
+ else:
+ self.idle()
def lock(self):
return 1
@@ -381,6 +346,10 @@ class BaseConnection(object):
"""
return attributeTracker.whoSet(self, 'connector')
+ def idle(self):
+ pass
+
+
attributeTracker.track(BaseConnection)
class ListeningConnection(BaseConnection):
@@ -400,8 +369,6 @@ class ListeningConnection(BaseConnection
handler = self.getHandler()
new_conn = ServerConnection(self.getEventManager(), handler,
connector=new_s, addr=addr)
- # A request for a node identification should arrive.
- self._timeout.update(time())
handler.connectionAccepted(new_conn)
except ConnectorTryAgainException:
pass
@@ -421,9 +388,8 @@ class Connection(BaseConnection):
connecting = False
- def __init__(self, event_manager, handler, connector, addr=None):
- BaseConnection.__init__(self, event_manager, handler,
- connector=connector, addr=addr)
+ def __init__(self, event_manager, *args, **kw):
+ BaseConnection.__init__(self, event_manager, *args, **kw)
self.read_buf = ReadBuffer()
self.write_buf = []
self.cur_id = 0
@@ -493,19 +459,7 @@ class Connection(BaseConnection):
except PacketMalformedError, msg:
self.getHandler()._packetMalformed(self, msg)
return
- self._timeout.refresh(time())
- packet_type = type(packet)
- if packet_type is Packets.Ping:
- # Send a pong notification
- PACKET_LOGGER.dispatch(self, packet, False)
- if not self.aborted:
- self.answer(Packets.Pong(), packet.getId())
- elif packet_type is Packets.Pong:
- # Skip PONG packets, its only purpose is refresh the timeout
- # generated upong ping. But still log them.
- PACKET_LOGGER.dispatch(self, packet, False)
- else:
- self._queue.append(packet)
+ self._queue.append(packet)
def hasPendingMessages(self):
"""
@@ -520,6 +474,7 @@ class Connection(BaseConnection):
# check out packet and process it with current handler
packet = self._queue.pop(0)
self._handlers.handle(self, packet)
+ self.updateTimeout()
def pending(self):
return self.connector is not None and self.write_buf
@@ -580,6 +535,7 @@ class Connection(BaseConnection):
'Connection %r closed in recv', self.connector)
self._closure()
return
+ self._base_timeout = time() # last known remote activity
self.read_buf.append(data)
@profiler_decorator
@@ -646,11 +602,10 @@ class Connection(BaseConnection):
msg_id = self._getNextId()
packet.setId(msg_id)
self._addPacket(packet)
- t = time()
- # If there is no pending request, initialise timeout values.
- if not self._handlers.isPending():
- self._timeout.update(t, force=True)
- self._handlers.emit(packet, t + timeout, on_timeout)
+ handlers = self._handlers
+ t = not handlers.isPending() and time() or None
+ handlers.emit(packet, timeout, on_timeout)
+ self.updateTimeout(t)
return msg_id
@not_closed
@@ -662,21 +617,14 @@ class Connection(BaseConnection):
assert packet.isResponse(), packet
self._addPacket(packet)
- @not_closed
- def ping(self):
- packet = Packets.Ping()
- packet.setId(self._getNextId())
- self._addPacket(packet)
-
class ClientConnection(Connection):
"""A connection from this node to a remote node."""
connecting = True
- def __init__(self, event_manager, handler, addr, connector, **kw):
- Connection.__init__(self, event_manager, handler, addr=addr,
- connector=connector)
+ def __init__(self, event_manager, handler, addr, connector):
+ Connection.__init__(self, event_manager, handler, connector, addr)
handler.connectionStarted(self)
try:
try:
@@ -685,6 +633,7 @@ class ClientConnection(Connection):
event_manager.addWriter(self)
else:
self.connecting = False
+ self.updateTimeout(time())
self.getHandler().connectionCompleted(self)
except ConnectorConnectionRefusedException:
self._closure()
@@ -702,6 +651,7 @@ class ClientConnection(Connection):
return
else:
self.connecting = False
+ self.updateTimeout(time())
self.getHandler().connectionCompleted(self)
self.em.addReader(self)
else:
@@ -710,10 +660,17 @@ class ClientConnection(Connection):
def isClient(self):
return True
+ def idle(self):
+ self.ask(Packets.Ping())
+
class ServerConnection(Connection):
"""A connection from a remote node to this node."""
+ def __init__(self, *args, **kw):
+ Connection.__init__(self, *args, **kw)
+ self.updateTimeout(time())
+
def isServer(self):
return True
@@ -778,11 +735,10 @@ class MTClientConnection(ClientConnectio
else:
self.dispatcher.register(self, msg_id, queue)
self._addPacket(packet)
- t = time()
- # If there is no pending request, initialise timeout values.
- if not self._handlers.isPending():
- self._timeout.update(t)
- self._handlers.emit(packet, t + timeout, on_timeout)
+ handlers = self._handlers
+ t = not handlers.isPending() and time() or None
+ handlers.emit(packet, timeout, on_timeout)
+ self.updateTimeout(t)
return msg_id
finally:
self.unlock()
Modified: trunk/neo/lib/handler.py
==============================================================================
--- trunk/neo/lib/handler.py [iso-8859-1] (original)
+++ trunk/neo/lib/handler.py [iso-8859-1] Fri May 27 18:53:16 2011
@@ -121,6 +121,15 @@ class EventHandler(object):
# Packet handlers.
+ def ping(self, conn):
+ if not conn.isAborted():
+ conn.answer(Packets.Pong())
+
+ def pong(self, conn):
+ # Ignore PONG packets. The only purpose of ping/pong packets is
+ # to test/maintain underlying connection.
+ pass
+
def notify(self, conn, message):
neo.lib.logging.info('notification from %r: %s', conn, message)
Modified: trunk/neo/tests/testConnection.py
==============================================================================
--- trunk/neo/tests/testConnection.py [iso-8859-1] (original)
+++ trunk/neo/tests/testConnection.py [iso-8859-1] Fri May 27 18:53:16 2011
@@ -1,3 +1,4 @@
+# -*- coding: utf-8 -*-
#
# Copyright (C) 2009-2010 Nexedi SA
#
@@ -19,11 +20,12 @@ from time import time
from mock import Mock
from neo.lib.connection import ListeningConnection, Connection, \
ClientConnection, ServerConnection, MTClientConnection, \
- HandlerSwitcher, Timeout, PING_DELAY, PING_TIMEOUT, OnTimeout
+ HandlerSwitcher, CRITICAL_TIMEOUT
from neo.lib.connector import getConnectorHandler, registerConnectorHandler
from neo.tests import DoNothingConnector
from neo.lib.connector import ConnectorException, ConnectorTryAgainException, \
ConnectorInProgressException, ConnectorConnectionRefusedException
+from neo.lib.handler import EventHandler
from neo.lib.protocol import Packets, ParserState
from neo.tests import NeoUnitTestBase
from neo.lib.util import ReadBuffer
@@ -502,40 +504,6 @@ class ConnectionTests(NeoUnitTestBase):
self.assertEqual(data.decode(), p.decode())
self._checkReadBuf(bc, '')
- def test_Connection_analyse5(self):
- # test ping handling
- bc = self._makeConnection()
- bc._queue = Mock()
- p = Packets.Ping()
- p.setId(1)
- self._appendPacketToReadBuf(bc, p)
- bc.analyse()
- # check no packet was queued
- self.assertEqual(len(bc._queue.mockGetNamedCalls("append")), 0)
- # check pong answered
- parser_state = ParserState()
- buffer = ReadBuffer()
- for chunk in bc.write_buf:
- buffer.append(chunk)
- answer = Packets.parse(buffer, parser_state)
- self.assertTrue(answer is not None)
- self.assertTrue(type(answer) == Packets.Pong)
- self.assertEqual(answer.getId(), p.getId())
-
- def test_Connection_analyse6(self):
- # test pong handling
- bc = self._makeConnection()
- bc._timeout = Mock()
- bc._queue = Mock()
- p = Packets.Pong()
- p.setId(1)
- self._appendPacketToReadBuf(bc, p)
- bc.analyse()
- # check no packet was queued
- self.assertEqual(len(bc._queue.mockGetNamedCalls("append")), 0)
- # check timeout has been refreshed
- self.assertEqual(len(bc._timeout.mockGetNamedCalls("refresh")), 1)
-
def test_Connection_writable1(self):
# with pending operation after send
def send(self, data):
@@ -800,6 +768,68 @@ class ConnectionTests(NeoUnitTestBase):
next_id = bc._getNextId()
self.assertEqual(next_id, 0)
+ def test_15_Timeout(self):
+ # NOTE: This method uses ping/pong packets only because MT connection
+ # don't accept any other packet without specifying a queue.
+ self.handler = EventHandler(self.app)
+ conn = self._makeClientConnection()
+
+ use_case_list = (
+ # (a) For a single packet sent at T,
+ # the limit time for the answer is T + (1 * CRITICAL_TIMEOUT)
+ ((), (1., 0)),
+ # (b) Same as (a), even if send another packet at (T + CT/2).
+ # But receiving a packet (at T + CT - ε) resets the timeout
+ # (which means the limit for the 2nd one is T + 2*CT)
+ ((.5, None), (1., 0, 2., 1)),
+ # (c) Same as (b) with a first answer at well before the limit
+ # (T' = T + CT/2). The limit for the second one is T' + CT.
+ ((.1, None, .5, 1), (1.5, 0)),
+ )
+
+ from neo.lib import connection
+ def set_time(t):
+ connection.time = lambda: int(CRITICAL_TIMEOUT * (1000 + t))
+ closed = []
+ conn.close = lambda: closed.append(connection.time())
+ def answer(packet_id):
+ p = Packets.Pong()
+ p.setId(packet_id)
+ conn.connector.receive = [''.join(p.encode())].pop
+ conn.readable()
+ conn.checkTimeout(connection.time())
+ conn.process()
+ try:
+ for use_case, expected in use_case_list:
+ i = iter(use_case)
+ conn.cur_id = 0
+ set_time(0)
+ # No timeout when no pending request
+ self.assertEqual(conn._handlers.getNextTimeout(), None)
+ conn.ask(Packets.Ping())
+ for t in i:
+ set_time(t)
+ conn.checkTimeout(connection.time())
+ packet_id = i.next()
+ if packet_id is None:
+ conn.ask(Packets.Ping())
+ else:
+ answer(packet_id)
+ i = iter(expected)
+ for t in i:
+ set_time(t - .1)
+ conn.checkTimeout(connection.time())
+ set_time(t)
+ # this test method relies on the fact that only
+ # conn.close is called in case of a timeout
+ conn.checkTimeout(connection.time())
+ self.assertEqual(closed.pop(), connection.time())
+ answer(i.next())
+ self.assertFalse(conn.isPending())
+ self.assertFalse(closed)
+ finally:
+ connection.time = time
+
class MTConnectionTests(ConnectionTests):
# XXX: here we test non-client-connection-related things too, which
# duplicates test suite work... Should be fragmented into finer-grained
@@ -1005,142 +1035,6 @@ class HandlerSwitcherTests(NeoUnitTestBa
self._handlers.handle(self._connection, a2)
self.checkAborted(self._connection)
- def testTimeout(self):
- """
- This timeout happens when a request has not been answered for longer
- than a duration defined at emit() time.
- """
- now = time()
- # No timeout when no pending request
- self.assertEqual(self._handlers.checkTimeout(self._connection, now),
- None)
- # Prepare some requests
- msg_id_1 = 1
- msg_id_2 = 2
- msg_id_3 = 3
- msg_id_4 = 4
- r1 = self._makeRequest(msg_id_1)
- a1 = self._makeAnswer(msg_id_1)
- r2 = self._makeRequest(msg_id_2)
- a2 = self._makeAnswer(msg_id_2)
- r3 = self._makeRequest(msg_id_3)
- r4 = self._makeRequest(msg_id_4)
- msg_1_time = now + 5
- msg_2_time = msg_1_time + 5
- msg_3_time = msg_2_time + 5
- msg_4_time = msg_3_time + 5
- markers = []
- def msg_3_on_timeout(conn, msg_id):
- markers.append((3, conn, msg_id))
- return True
- def msg_4_on_timeout(conn, msg_id):
- markers.append((4, conn, msg_id))
- return False
- # Emit r3 before all other, to test that it's time parameter value
- # which is used, not the registration order.
- self._handlers.emit(r3, msg_3_time, OnTimeout(msg_3_on_timeout))
- self._handlers.emit(r1, msg_1_time, None)
- self._handlers.emit(r2, msg_2_time, None)
- # No timeout before msg_1_time
- self.assertEqual(self._handlers.checkTimeout(self._connection, now),
- None)
- # Timeout for msg_1 after msg_1_time
- self.assertEqual(self._handlers.checkTimeout(self._connection,
- msg_1_time + 0.5), msg_id_1)
- # If msg_1 met its answer, no timeout after msg_1_time
- self._handlers.handle(self._connection, a1)
- self.assertEqual(self._handlers.checkTimeout(self._connection,
- msg_1_time + 0.5), None)
- # Next timeout is after msg_2_time
- self.assertEqual(self._handlers.checkTimeout(self._connection,
- msg_2_time + 0.5), msg_id_2)
- self._handlers.handle(self._connection, a2)
- # Sanity check
- self.assertEqual(self._handlers.checkTimeout(self._connection,
- msg_2_time + 0.5), None)
- # msg_3 timeout will fire msg_3_on_timeout callback, which causes the
- # timeout to be ignored (it returns True)
- self.assertEqual(self._handlers.checkTimeout(self._connection,
- msg_3_time + 0.5), None)
- # ...check that callback actually fired
- self.assertEqual(len(markers), 1)
- # ...with expected parameters
- self.assertEqual(markers[0], (3, self._connection, msg_id_3))
- # answer to msg_3 must not be expected anymore (and it was the last
- # expected message)
- self.assertFalse(self._handlers.isPending())
- del markers[:]
- self._handlers.emit(r4, msg_4_time, OnTimeout(msg_4_on_timeout))
- # msg_4 timeout will fire msg_4_on_timeout callback, which lets the
- # timeout be detected (it returns False)
- self.assertEqual(self._handlers.checkTimeout(self._connection,
- msg_4_time + 0.5), msg_id_4)
- # ...check that callback actually fired
- self.assertEqual(len(markers), 1)
- # ...with expected parameters
- self.assertEqual(markers[0], (4, self._connection, msg_id_4))
-
-class TestTimeout(NeoUnitTestBase):
- def setUp(self):
- NeoUnitTestBase.setUp(self)
- self.current = time()
- self.timeout = Timeout()
- self._updateAt(0)
- self.assertTrue(PING_DELAY > PING_TIMEOUT) # Sanity check
-
- def _checkAt(self, n, soft, hard):
- at = self.current + n
- self.assertEqual(soft, self.timeout.softExpired(at))
- self.assertEqual(hard, self.timeout.hardExpired(at))
-
- def _updateAt(self, n, force=False):
- self.timeout.update(self.current + n, force=force)
-
- def _refreshAt(self, n):
- self.timeout.refresh(self.current + n)
-
- def _pingAt(self, n):
- self.timeout.ping(self.current + n)
-
- def testSoftTimeout(self):
- """
- Soft timeout is when a ping should be sent to peer to see if it's
- still responsive, after seing no life sign for PING_DELAY.
- """
- # Before PING_DELAY, no timeout.
- self._checkAt(PING_DELAY - 0.5, False, False)
- # If nothing came to refresh the timeout, soft timeout will be asserted
- # after PING_DELAY.
- self._checkAt(PING_DELAY + 0.5, True, False)
- # If something refreshes the timeout, soft timeout will not be asserted
- # after PING_DELAY.
- answer_time = PING_DELAY - 0.5
- self._refreshAt(answer_time)
- self._checkAt(PING_DELAY + 0.5, False, False)
- # ...but it will happen again after PING_DELAY after that answer
- self._checkAt(answer_time + PING_DELAY + 0.5, True, False)
- # if there is no more pending requests, a clear will happen so next
- # send doesn't immediately trigger a ping
- new_request_time = answer_time + PING_DELAY * 2
- self._updateAt(new_request_time, force=True)
- self._checkAt(new_request_time + PING_DELAY - 0.5, False, False)
- self._checkAt(new_request_time + PING_DELAY + 0.5, True, False)
-
- def testHardTimeout(self):
- """
- Hard timeout is when a ping was sent, and any life sign must come
- back to us before PING_TIMEOUT.
- """
- # A timeout triggered at PING_DELAY, so a ping was sent.
- self._pingAt(PING_DELAY)
- # Before PING_DELAY + PING_TIMEOUT, no timeout occurs.
- self._checkAt(PING_DELAY + PING_TIMEOUT - 0.5, False, False)
- # After PING_DELAY + PING_TIMEOUT, hard timeout occurs.
- self._checkAt(PING_DELAY + PING_TIMEOUT + 0.5, False, True)
- # If anything happened on the connection, there is no hard timeout
- # anymore after PING_DELAY + PING_TIMEOUT.
- self._refreshAt(PING_DELAY + PING_TIMEOUT - 0.5)
- self._checkAt(PING_DELAY + PING_TIMEOUT + 0.5, False, False)
if __name__ == '__main__':
unittest.main()
More information about the Neo-report
mailing list