[Neo-report] r2032 vincent - /trunk/neo/tests/testConnection.py
nobody at svn.erp5.org
nobody at svn.erp5.org
Tue Apr 27 17:11:38 CEST 2010
Author: vincent
Date: Tue Apr 27 17:11:35 2010
New Revision: 2032
Log:
Add tests for ping/pong handling in Connection.analyse .
Modified:
trunk/neo/tests/testConnection.py
Modified: trunk/neo/tests/testConnection.py
==============================================================================
--- trunk/neo/tests/testConnection.py [iso-8859-1] (original)
+++ trunk/neo/tests/testConnection.py [iso-8859-1] Tue Apr 27 17:11:35 2010
@@ -24,8 +24,9 @@
from neo.tests import DoNothingConnector
from neo.connector import ConnectorException, ConnectorTryAgainException, \
ConnectorInProgressException, ConnectorConnectionRefusedException
-from neo.protocol import Packets
+from neo.protocol import Packets, ParserState
from neo.tests import NeoTestBase
+from neo.util import ReadBuffer
class ConnectionTests(NeoTestBase):
@@ -505,6 +506,40 @@
self.assertEqual(data.decode(), p.decode())
self._checkReadBuf(bc, '')
+ def test_Connection_analyse5(self):
+ # test ping handling
+ bc = self._makeConnection()
+ bc._queue = Mock()
+ p = Packets.Ping()
+ p.setId(1)
+ self._appendPacketToReadBuf(bc, p)
+ bc.analyse()
+ # check no packet was queued
+ self.assertEquals(len(bc._queue.mockGetNamedCalls("append")), 0)
+ # check pong answered
+ parser_state = ParserState()
+ buffer = ReadBuffer()
+ for chunk in bc.write_buf:
+ buffer.append(chunk)
+ answer = Packets.parse(buffer, parser_state)
+ self.assertTrue(answer is not None)
+ self.assertTrue(answer.getType() == Packets.Pong)
+ self.assertEqual(answer.getId(), p.getId())
+
+ def test_Connection_analyse6(self):
+ # test pong handling
+ bc = self._makeConnection()
+ bc._timeout = Mock()
+ bc._queue = Mock()
+ p = Packets.Pong()
+ p.setId(1)
+ self._appendPacketToReadBuf(bc, p)
+ bc.analyse()
+ # check no packet was queued
+ self.assertEquals(len(bc._queue.mockGetNamedCalls("append")), 0)
+ # check timeout has been refreshed
+ self.assertEquals(len(bc._timeout.mockGetNamedCalls("refresh")), 1)
+
def test_Connection_writable1(self):
# with pending operation after send
def send(self, data):
More information about the Neo-report
mailing list