[Neo-report] r2002 vincent - in /trunk/neo: tests/testConnection.py tests/testUtil.py util.py
nobody at svn.erp5.org
nobody at svn.erp5.org
Mon Apr 19 11:49:42 CEST 2010
Author: vincent
Date: Mon Apr 19 11:49:41 2010
New Revision: 2002
Log:
Remove ReadBuffer.skip and ReadBuffer.peek as they are not used anymore.
Modified:
trunk/neo/tests/testConnection.py
trunk/neo/tests/testUtil.py
trunk/neo/util.py
Modified: trunk/neo/tests/testConnection.py
==============================================================================
--- trunk/neo/tests/testConnection.py [iso-8859-1] (original)
+++ trunk/neo/tests/testConnection.py [iso-8859-1] Mon Apr 19 11:49:41 2010
@@ -128,7 +128,7 @@
self.assertEquals(len(calls), n)
def _checkReadBuf(self, bc, data):
- content = bc.read_buf.peek(len(bc.read_buf))
+ content = bc.read_buf.read(len(bc.read_buf))
self.assertEqual(''.join(content), data)
def _appendToReadBuf(self, bc, data):
@@ -464,10 +464,10 @@
bc = self._makeConnection()
bc._queue = Mock()
self._appendToReadBuf(bc, 'datadatadatadata')
- self.assertEqual(len(bc.read_buf), 16)
bc.analyse()
- self.assertEqual(len(bc.read_buf), 16)
self.assertEquals(len(bc._queue.mockGetNamedCalls("append")), 0)
+ self.assertEquals(
+ len(self.handler.mockGetNamedCalls("_packetMalformed")), 1)
def test_Connection_analyse4(self):
# give an expected packet
Modified: trunk/neo/tests/testUtil.py
==============================================================================
--- trunk/neo/tests/testUtil.py [iso-8859-1] (original)
+++ trunk/neo/tests/testUtil.py [iso-8859-1] Mon Apr 19 11:49:41 2010
@@ -40,30 +40,6 @@
self.assertEqual(buf.read(3), None)
self.assertEqual(buf.read(2), 'ef')
- def testReadBufferPeek(self):
- buf = ReadBuffer()
- self.assertEqual(len(buf), 0)
- buf.append('abc')
- self.assertEqual(len(buf), 3)
- # peek some data
- self.assertEqual(buf.peek(3), 'abc')
- self.assertEqual(buf.peek(5), None) # not enough
- buf.append('def')
- self.assertEqual(len(buf), 6)
- self.assertEqual(buf.peek(3), 'abc') # no change
- self.assertEqual(buf.peek(6), 'abcdef')
- self.assertEqual(buf.peek(7), None)
-
- def testReadBufferSkip(self):
- buf = ReadBuffer()
- self.assertEqual(len(buf), 0)
- buf.append('abc')
- self.assertEqual(len(buf), 3)
- buf.skip(1)
- self.assertEqual(len(buf), 2)
- buf.skip(3) # eat all
- self.assertEqual(len(buf), 0)
-
if __name__ == "__main__":
unittest.main()
Modified: trunk/neo/util.py
==============================================================================
--- trunk/neo/util.py [iso-8859-1] (original)
+++ trunk/neo/util.py [iso-8859-1] Mon Apr 19 11:49:41 2010
@@ -154,56 +154,29 @@
""" Return the current buffer size """
return self.size
- def _read(self, size):
- """ Join all required chunks to build a string of requested size """
- chunk_list = []
- pop_chunk = self.content.popleft
- append_data = chunk_list.append
- # select required chunks
- while size > 0:
- chunk_size, chunk_data = pop_chunk()
- size -= chunk_size
- append_data(chunk_data)
- if size < 0:
- # too many bytes consumed, cut the last chunk
- last_chunk = chunk_list[-1]
- keep, let = last_chunk[:size], last_chunk[size:]
- self.content.appendleft((-size, let))
- chunk_list[-1] = keep
- # join all chunks (one copy)
- return ''.join(chunk_list)
-
- def skip(self, size):
- """ Skip at most size bytes """
- if self.size <= size:
- self.size = 0
- self.content.clear()
- return
- pop_chunk = self.content.popleft
- self.size -= size
- # skip chunks
- while size > 0:
- chunk_size, last_chunk = pop_chunk()
- size -= chunk_size
- if size < 0:
- # but keep a part of the last one if needed
- self.content.append((-size, last_chunk[size:]))
-
- def peek(self, size):
- """ Read size bytes but don't consume """
- if self.size < size:
- return None
- data = self._read(size)
- self.content.appendleft((size, data))
- assert len(data) == size
- return data
-
def read(self, size):
""" Read and consume size bytes """
if self.size < size:
return None
self.size -= size
- data = self._read(size)
+ chunk_list = []
+ pop_chunk = self.content.popleft
+ append_data = chunk_list.append
+ to_read = size
+ chunk_len = 0
+ # select required chunks
+ while to_read > 0:
+ chunk_size, chunk_data = pop_chunk()
+ to_read -= chunk_size
+ append_data(chunk_data)
+ if to_read < 0:
+ # too many bytes consumed, cut the last chunk
+ last_chunk = chunk_list[-1]
+ keep, let = last_chunk[:to_read], last_chunk[to_read:]
+ self.content.appendleft((-to_read, let))
+ chunk_list[-1] = keep
+ # join all chunks (one copy)
+ data = ''.join(chunk_list)
assert len(data) == size
return data
More information about the Neo-report
mailing list