[Neo-report] r2002 vincent - in /trunk/neo: tests/testConnection.py tests/testUtil.py util.py

nobody at svn.erp5.org nobody at svn.erp5.org
Mon Apr 19 11:49:42 CEST 2010


Author: vincent
Date: Mon Apr 19 11:49:41 2010
New Revision: 2002

Log:
Remove ReadBuffer.skip and ReadBuffer.peek as they are not used anymore.

Modified:
    trunk/neo/tests/testConnection.py
    trunk/neo/tests/testUtil.py
    trunk/neo/util.py

Modified: trunk/neo/tests/testConnection.py
==============================================================================
--- trunk/neo/tests/testConnection.py [iso-8859-1] (original)
+++ trunk/neo/tests/testConnection.py [iso-8859-1] Mon Apr 19 11:49:41 2010
@@ -128,7 +128,7 @@
         self.assertEquals(len(calls), n)
 
     def _checkReadBuf(self, bc, data):
-        content = bc.read_buf.peek(len(bc.read_buf))
+        content = bc.read_buf.read(len(bc.read_buf))
         self.assertEqual(''.join(content), data)
 
     def _appendToReadBuf(self, bc, data):
@@ -464,10 +464,10 @@
         bc = self._makeConnection()
         bc._queue = Mock()
         self._appendToReadBuf(bc, 'datadatadatadata')
-        self.assertEqual(len(bc.read_buf), 16)
         bc.analyse()
-        self.assertEqual(len(bc.read_buf), 16)
         self.assertEquals(len(bc._queue.mockGetNamedCalls("append")), 0)
+        self.assertEquals(
+            len(self.handler.mockGetNamedCalls("_packetMalformed")), 1)
 
     def test_Connection_analyse4(self):
         # give an expected packet

Modified: trunk/neo/tests/testUtil.py
==============================================================================
--- trunk/neo/tests/testUtil.py [iso-8859-1] (original)
+++ trunk/neo/tests/testUtil.py [iso-8859-1] Mon Apr 19 11:49:41 2010
@@ -40,30 +40,6 @@
         self.assertEqual(buf.read(3), None)
         self.assertEqual(buf.read(2), 'ef')
 
-    def testReadBufferPeek(self):
-        buf = ReadBuffer()
-        self.assertEqual(len(buf), 0)
-        buf.append('abc')
-        self.assertEqual(len(buf), 3)
-        # peek some data
-        self.assertEqual(buf.peek(3), 'abc')
-        self.assertEqual(buf.peek(5), None) # not enough
-        buf.append('def')
-        self.assertEqual(len(buf), 6)
-        self.assertEqual(buf.peek(3), 'abc') # no change
-        self.assertEqual(buf.peek(6), 'abcdef')
-        self.assertEqual(buf.peek(7), None)
-
-    def testReadBufferSkip(self):
-        buf = ReadBuffer()
-        self.assertEqual(len(buf), 0)
-        buf.append('abc')
-        self.assertEqual(len(buf), 3)
-        buf.skip(1)
-        self.assertEqual(len(buf), 2)
-        buf.skip(3) # eat all
-        self.assertEqual(len(buf), 0)
-
 if __name__ == "__main__":
     unittest.main()
 

Modified: trunk/neo/util.py
==============================================================================
--- trunk/neo/util.py [iso-8859-1] (original)
+++ trunk/neo/util.py [iso-8859-1] Mon Apr 19 11:49:41 2010
@@ -154,56 +154,29 @@
         """ Return the current buffer size """
         return self.size
 
-    def _read(self, size):
-        """ Join all required chunks to build a string of requested size """
-        chunk_list = []
-        pop_chunk = self.content.popleft
-        append_data = chunk_list.append
-        # select required chunks
-        while size > 0:
-            chunk_size, chunk_data = pop_chunk()
-            size -= chunk_size
-            append_data(chunk_data)
-        if size < 0:
-            # too many bytes consumed, cut the last chunk
-            last_chunk = chunk_list[-1]
-            keep, let = last_chunk[:size], last_chunk[size:]
-            self.content.appendleft((-size, let))
-            chunk_list[-1] = keep
-        # join all chunks (one copy)
-        return ''.join(chunk_list)
-
-    def skip(self, size):
-        """ Skip at most size bytes """
-        if self.size <= size:
-            self.size = 0
-            self.content.clear()
-            return
-        pop_chunk = self.content.popleft
-        self.size -= size
-        # skip chunks
-        while size > 0:
-            chunk_size, last_chunk = pop_chunk()
-            size -= chunk_size
-        if size < 0:
-            # but keep a part of the last one if needed
-            self.content.append((-size, last_chunk[size:]))
-
-    def peek(self, size):
-        """ Read size bytes but don't consume """
-        if self.size < size:
-            return None
-        data = self._read(size)
-        self.content.appendleft((size, data))
-        assert len(data) == size
-        return data
-
     def read(self, size):
         """ Read and consume size bytes """
         if self.size < size:
             return None
         self.size -= size
-        data = self._read(size)
+        chunk_list = []
+        pop_chunk = self.content.popleft
+        append_data = chunk_list.append
+        to_read = size
+        chunk_len = 0
+        # select required chunks
+        while to_read > 0:
+            chunk_size, chunk_data = pop_chunk()
+            to_read -= chunk_size
+            append_data(chunk_data)
+        if to_read < 0:
+            # too many bytes consumed, cut the last chunk
+            last_chunk = chunk_list[-1]
+            keep, let = last_chunk[:to_read], last_chunk[to_read:]
+            self.content.appendleft((-to_read, let))
+            chunk_list[-1] = keep
+        # join all chunks (one copy)
+        data = ''.join(chunk_list)
         assert len(data) == size
         return data
 





More information about the Neo-report mailing list