[Neo-report] r2009 vincent - in /trunk/neo: connection.py tests/testConnection.py
nobody at svn.erp5.org
nobody at svn.erp5.org
Thu Apr 22 17:55:09 CEST 2010
Author: vincent
Date: Thu Apr 22 17:55:09 2010
New Revision: 2009
Log:
Change (again) the way timeouts are handled.
There are 2 distinct kinds of timeout events:
- an unresponsive node
This is a connection-level timeout.
This is handled by the Timeout class, triggering pings and monitoring
incoming data to decide when remote node is considered dead.
- a "too long" processing from an otherwise responsive node
This is a per-request timeout.
This is handled by the HandlerSwitcher class, triggering only
disconnections when an answer takes too long to arrive (historical
behaviour, not so useful when exchanging with a single-threaded peer).
Previous implementation mixed both, and had shortcomings (ping would
timeout almost immediately, it was not possible to tell which message
caused a timeout).
Update tests.
Modified:
trunk/neo/connection.py
trunk/neo/tests/testConnection.py
Modified: trunk/neo/connection.py
==============================================================================
--- trunk/neo/connection.py [iso-8859-1] (original)
+++ trunk/neo/connection.py [iso-8859-1] Thu Apr 22 17:55:09 2010
@@ -31,7 +31,7 @@
from neo.util import ReadBuffer
from neo.profiling import profiler_decorator
-PING_DELAY = 5
+PING_DELAY = 6
PING_TIMEOUT = 5
INCOMING_TIMEOUT = 10
CRITICAL_TIMEOUT = 30
@@ -70,6 +70,8 @@
class HandlerSwitcher(object):
+ _next_timeout = None
+ _next_timeout_msg_id = None
def __init__(self, connection, handler):
self._connection = connection
@@ -87,7 +89,7 @@
return self._pending[0][1]
@profiler_decorator
- def emit(self, request):
+ def emit(self, request, timeout):
# register the request in the current handler
_pending = self._pending
assert len(_pending) == 1 or _pending[0][0]
@@ -96,7 +98,19 @@
answer_class = request.getAnswerClass()
assert answer_class is not None, "Not a request"
assert msg_id not in request_dict, "Packet id already expected"
- request_dict[msg_id] = answer_class
+ next_timeout = self._next_timeout
+ if next_timeout is None or timeout < next_timeout:
+ self._next_timeout = timeout
+ self._next_timeout_msg_id = msg_id
+ request_dict[msg_id] = (answer_class, timeout)
+
+ def checkTimeout(self, t):
+ next_timeout = self._next_timeout
+ if next_timeout is not None and next_timeout < t:
+ result = self._next_timeout_msg_id
+ else:
+ result = None
+ return result
@profiler_decorator
def handle(self, packet):
@@ -109,7 +123,7 @@
handler.packetReceived(self._connection, packet)
return
# checkout the expected answer class
- klass = request_dict.pop(msg_id, None)
+ (klass, timeout) = request_dict.pop(msg_id, (None, None))
if klass and isinstance(packet, klass) or packet.isError():
handler.packetReceived(self._connection, packet)
else:
@@ -122,6 +136,18 @@
while len(self._pending) > 1 and not self._pending[0][0]:
del self._pending[0]
logging.debug('Apply handler %r', self._pending[0][1])
+ if timeout == self._next_timeout:
+ # Find next timeout and its msg_id
+ timeout_list = []
+ extend = timeout_list.extend
+ for (request_dict, handler) in self._pending:
+ extend(((timeout, msg_id) \
+ for msg_id, (_, timeout) in request_dict.iteritems()))
+ if timeout_list:
+ timeout_list.sort(key=lambda x: x[0])
+ self._next_timeout, self._next_timeout_msg_id = timeout_list[0]
+ else:
+ self._next_timeout, self._next_timeout_msg_id = None, None
@profiler_decorator
def setHandler(self, handler):
@@ -136,31 +162,48 @@
class Timeout(object):
- """ Keep track of current timeouts """
+ """ Keep track of connection-level timeouts """
def __init__(self):
self._ping_time = None
self._critical_time = None
- def update(self, t, timeout=CRITICAL_TIMEOUT):
- """ Update the new critical time """
- self._ping_time = t + PING_TIMEOUT
- critical_time = self._ping_time + timeout
- self._critical_time = max(critical_time, self._critical_time)
+ def update(self, t):
+ """
+ Send occurred:
+ - set ping time if earlier than existing one
+ """
+ ping_time = self._ping_time
+ t += PING_DELAY
+ if ping_time is None or t < ping_time:
+ self._ping_time = t
def refresh(self, t):
- """ Refresh timeout after something received """
+ """
+ Recv occured:
+ - reschedule next ping time
+ - as this is an evidence that node is alive, remove pong expectation
+ """
self._ping_time = t + PING_DELAY
+ self._critical_time = None
+
+ def ping(self, t):
+ """
+ Ping send occured:
+ - reschedule next ping time
+ - set pong expectation
+ """
+ self._ping_time = t + PING_DELAY
+ self._critical_time = t + PING_TIMEOUT
def softExpired(self, t):
- """ Indicate if the soft timeout (ping delay) is reached """
- # hard timeout takes precedences
- return self._ping_time < t < self._critical_time
+ """ Do we need to ping ? """
+ return self._ping_time < t
def hardExpired(self, t):
- """ Indicate if hard (or pong) timeout is reached """
- # should be called if softExpired if False
- return self._critical_time < t or self._ping_time < t
+ """ Have we reached pong latest arrival time, if set ? """
+ critical_time = self._critical_time
+ return critical_time is not None and critical_time < t
class BaseConnection(object):
@@ -176,16 +219,23 @@
event_manager.register(self)
def checkTimeout(self, t):
- if self._handlers.isPending():
- if self._timeout.softExpired(t):
- self._timeout.refresh(t)
- self.ping()
+ handlers = self._handlers
+ if handlers.isPending():
+ msg_id = handlers.checkTimeout(t)
+ if msg_id is not None:
+ logging.info('timeout for %r with %s:%d', msg_id,
+ *self.getAddress())
+ self.close()
+ self.getHandler().timeoutExpired(self)
elif self._timeout.hardExpired(t):
# critical time reach or pong not received, abort
logging.info('timeout with %s:%d', *(self.getAddress()))
self.notify(Packets.Notify('Timeout'))
self.abort()
self.getHandler().timeoutExpired(self)
+ elif self._timeout.softExpired(t):
+ self._timeout.ping(t)
+ self.ping()
def lock(self):
return 1
@@ -272,7 +322,7 @@
new_conn = ServerConnection(self.getEventManager(), handler,
connector=new_s, addr=addr)
# A request for a node identification should arrive.
- self._timeout.update(time(), timeout=INCOMING_TIMEOUT)
+ self._timeout.update(time())
handler.connectionAccepted(new_conn)
except ConnectorTryAgainException:
pass
@@ -493,10 +543,11 @@
msg_id = self._getNextId()
packet.setId(msg_id)
self._addPacket(packet)
+ t = time()
# If there is no pending request, initialise timeout values.
if not self._handlers.isPending():
- self._timeout.update(time(), timeout=timeout)
- self._handlers.emit(packet)
+ self._timeout.update(t)
+ self._handlers.emit(packet, t + timeout)
return msg_id
@not_closed
@@ -615,10 +666,11 @@
packet.setId(msg_id)
self.dispatcher.register(self, msg_id, self._local_var.queue)
self._addPacket(packet)
+ t = time()
# If there is no pending request, initialise timeout values.
if not self._handlers.isPending():
- self._timeout.update(time(), timeout=timeout)
- self._handlers.emit(packet)
+ self._timeout.update(t)
+ self._handlers.emit(packet, t + timeout)
return msg_id
finally:
self.unlock()
Modified: trunk/neo/tests/testConnection.py
==============================================================================
--- trunk/neo/tests/testConnection.py [iso-8859-1] (original)
+++ trunk/neo/tests/testConnection.py [iso-8859-1] Thu Apr 22 17:55:09 2010
@@ -19,7 +19,7 @@
from mock import Mock
from neo.connection import ListeningConnection, Connection, \
ClientConnection, ServerConnection, MTClientConnection, \
- HandlerSwitcher, Timeout
+ HandlerSwitcher, Timeout, PING_DELAY, PING_TIMEOUT
from neo.connector import getConnectorHandler, registerConnectorHandler
from neo.tests import DoNothingConnector
from neo.connector import ConnectorException, ConnectorTryAgainException, \
@@ -808,7 +808,7 @@
def testEmit(self):
self.assertFalse(self._handlers.isPending())
request = self._makeRequest(1)
- self._handlers.emit(request)
+ self._handlers.emit(request, 0)
self.assertTrue(self._handlers.isPending())
def testHandleNotification(self):
@@ -818,7 +818,7 @@
self._checkPacketReceived(self._handler, notif1)
# emit a request and delay an handler
request = self._makeRequest(2)
- self._handlers.emit(request)
+ self._handlers.emit(request, 0)
handler = self._makeHandler()
self._handlers.setHandler(handler)
# next notification fall into the current handler
@@ -835,7 +835,7 @@
def testHandleAnswer1(self):
# handle with current handler
request = self._makeRequest(1)
- self._handlers.emit(request)
+ self._handlers.emit(request, 0)
answer = self._makeAnswer(1)
self._handlers.handle(answer)
self._checkPacketReceived(self._handler, answer)
@@ -843,7 +843,7 @@
def testHandleAnswer2(self):
# handle with blocking handler
request = self._makeRequest(1)
- self._handlers.emit(request)
+ self._handlers.emit(request, 0)
handler = self._makeHandler()
self._handlers.setHandler(handler)
answer = self._makeAnswer(1)
@@ -863,11 +863,11 @@
h2 = self._makeHandler()
h3 = self._makeHandler()
# emit all requests and setHandleres
- self._handlers.emit(r1)
+ self._handlers.emit(r1, 0)
self._handlers.setHandler(h1)
- self._handlers.emit(r2)
+ self._handlers.emit(r2, 0)
self._handlers.setHandler(h2)
- self._handlers.emit(r3)
+ self._handlers.emit(r3, 0)
self._handlers.setHandler(h3)
self._checkCurrentHandler(self._handler)
self.assertTrue(self._handlers.isPending())
@@ -889,9 +889,9 @@
a3 = self._makeAnswer(3)
h = self._makeHandler()
# emit all requests
- self._handlers.emit(r1)
- self._handlers.emit(r2)
- self._handlers.emit(r3)
+ self._handlers.emit(r1, 0)
+ self._handlers.emit(r2, 0)
+ self._handlers.emit(r3, 0)
self._handlers.setHandler(h)
# process answers
self._handlers.handle(a1)
@@ -908,59 +908,98 @@
a2 = self._makeAnswer(2)
h = self._makeHandler()
# emit requests aroung state setHandler
- self._handlers.emit(r1)
+ self._handlers.emit(r1, 0)
self._handlers.setHandler(h)
- self._handlers.emit(r2)
+ self._handlers.emit(r2, 0)
# process answer for next state
self._handlers.handle(a2)
self.checkAborted(self._connection)
+ def testTimeout(self):
+ """
+ This timeout happens when a request has not been answered for longer
+ than a duration defined at emit() time.
+ """
+ now = time()
+ # No timeout when no pending request
+ self.assertEqual(self._handlers.checkTimeout(now), None)
+ # Prepare some requests
+ msg_id_1 = 1
+ msg_id_2 = 2
+ msg_id_3 = 3
+ r1 = self._makeRequest(msg_id_1)
+ a1 = self._makeAnswer(msg_id_1)
+ r2 = self._makeRequest(msg_id_2)
+ r3 = self._makeRequest(msg_id_3)
+ msg_1_time = now + 5
+ msg_2_time = msg_1_time + 5
+ msg_3_time = msg_2_time + 5
+ # Emit r3 before all other, to test that it's time parameter value
+ # which is used, not the registration order.
+ self._handlers.emit(r3, msg_3_time)
+ self._handlers.emit(r1, msg_1_time)
+ self._handlers.emit(r2, msg_2_time)
+ # No timeout before msg_1_time
+ self.assertEqual(self._handlers.checkTimeout(now), None)
+ # Timeout for msg_1 after msg_1_time
+ self.assertEqual(self._handlers.checkTimeout(msg_1_time + 0.5),
+ msg_id_1)
+ # If msg_1 met its answer, no timeout after msg_1_time
+ self._handlers.handle(a1)
+ self.assertEqual(self._handlers.checkTimeout(msg_1_time + 0.5), None)
+ # Next timeout is after msg_2_time
+ self.assertEqual(self._handlers.checkTimeout(msg_2_time + 0.5), msg_id_2)
class TestTimeout(NeoTestBase):
- """ assume PING_DELAY=5 """
-
def setUp(self):
- self.initial = time()
- self.current = self.initial
+ self.current = time()
self.timeout = Timeout()
-
- def checkAfter(self, n, soft, hard):
+ self.timeout.update(self.current)
+
+ def _checkAt(self, n, soft, hard):
at = self.current + n
self.assertEqual(soft, self.timeout.softExpired(at))
self.assertEqual(hard, self.timeout.hardExpired(at))
- def refreshAfter(self, n):
- self.current += n
- self.timeout.refresh(self.current)
-
- def testNoTimeout(self):
- self.timeout.update(self.initial, 5)
- self.checkAfter(1, False, False)
- self.checkAfter(4, False, False)
- self.refreshAfter(4) # answer received
- self.checkAfter(1, False, False)
+ def _refreshAt(self, n):
+ self.timeout.refresh(self.current + n)
+
+ def _pingAt(self, n):
+ self.timeout.ping(self.current + n)
def testSoftTimeout(self):
- self.timeout.update(self.initial, 5)
- self.checkAfter(1, False, False)
- self.checkAfter(4, False, False)
- self.checkAfter(6, True, True) # ping
- self.refreshAfter(8) # pong
- self.checkAfter(1, False, False)
- self.checkAfter(4, False, True)
+ """
+ Soft timeout is when a ping should be sent to peer to see if it's
+ still responsive, after seing no life sign for PING_DELAY.
+ """
+ # Before PING_DELAY, no timeout.
+ self._checkAt(PING_DELAY - 0.5, False, False)
+ # If nothing came to refresh the timeout, soft timeout will be asserted
+ # after PING_DELAY.
+ self._checkAt(PING_DELAY + 0.5, True, False)
+ # If something refreshes the timeout, soft timeout will not be asserted
+ # after PING_DELAY.
+ answer_time = PING_DELAY - 0.5
+ self._refreshAt(answer_time)
+ self._checkAt(PING_DELAY + 0.5, False, False)
+ # ...but it will happen again after PING_DELAY after that answer
+ self._checkAt(answer_time + PING_DELAY + 0.5, True, False)
def testHardTimeout(self):
- self.timeout.update(self.initial, 5)
- self.checkAfter(1, False, False)
- self.checkAfter(4, False, False)
- self.checkAfter(6, True, True) # ping
- self.refreshAfter(6) # pong
- self.checkAfter(1, False, False)
- self.checkAfter(4, False, False)
- self.checkAfter(6, False, True) # ping
- self.refreshAfter(6) # pong
- self.checkAfter(1, False, True) # too late
- self.checkAfter(5, False, True)
+ """
+ Hard timeout is when a ping was sent, and any life sign must come
+ back to us before PING_TIMEOUT.
+ """
+ # A timeout triggered at PING_DELAY, so a ping was sent.
+ self._pingAt(PING_DELAY)
+ # Before PING_DELAY + PING_TIMEOUT, no timeout occurs.
+ self._checkAt(PING_DELAY + PING_TIMEOUT - 0.5, False, False)
+ # After PING_DELAY + PING_TIMEOUT, hard timeout occurs.
+ self._checkAt(PING_DELAY + PING_TIMEOUT + 0.5, False, True)
+ # If anything happened on the connection, there is no hard timeout
+ # anymore after PING_DELAY + PING_TIMEOUT.
+ self._refreshAt(PING_DELAY + PING_TIMEOUT - 0.5)
+ self._checkAt(PING_DELAY + PING_TIMEOUT + 0.5, False, False)
if __name__ == '__main__':
unittest.main()
More information about the Neo-report
mailing list