[Neo-report] r1962 gregory - in /trunk/neo: ./ tests/

nobody at svn.erp5.org nobody at svn.erp5.org
Fri Mar 26 10:36:39 CET 2010


Author: gregory
Date: Fri Mar 26 10:36:38 2010
New Revision: 1962

Log:
Implement ReadBuffer to minimize incoming data copies.

ReadBuffer join received strings only when all requested data is available.
This avoid many useless data copies in case of big packets. The gain factor
is about 50x for a 25MB packet.

Modified:
    trunk/neo/connection.py
    trunk/neo/protocol.py
    trunk/neo/tests/testConnection.py
    trunk/neo/tests/testUtil.py
    trunk/neo/util.py

Modified: trunk/neo/connection.py
==============================================================================
--- trunk/neo/connection.py [iso-8859-1] (original)
+++ trunk/neo/connection.py [iso-8859-1] Fri Mar 26 10:36:38 2010
@@ -28,6 +28,7 @@
 from neo.logger import PACKET_LOGGER
 
 from neo import attributeTracker
+from neo.util import ReadBuffer
 from neo.profiling import profiler_decorator
 
 PING_DELAY = 5
@@ -284,7 +285,7 @@
     def __init__(self, event_manager, handler, connector, addr=None):
         BaseConnection.__init__(self, event_manager, handler,
                                 connector=connector, addr=addr)
-        self.read_buf = []
+        self.read_buf = ReadBuffer()
         self.write_buf = []
         self.cur_id = 0
         self.peer_id = 0
@@ -327,7 +328,7 @@
             self._on_close()
             self._on_close = None
         del self.write_buf[:]
-        del self.read_buf[:]
+        self.read_buf.clear()
         self._handlers.clear()
 
     def abort(self):
@@ -355,22 +356,16 @@
 
     def analyse(self):
         """Analyse received data."""
-        read_buf = self.read_buf
-        if len(read_buf) == 1:
-            msg = read_buf[0]
-        else:
-            msg = ''.join(self.read_buf)
         while True:
             # parse a packet
             try:
-                packet = Packets.parse(msg)
+                packet = Packets.parse(self.read_buf)
                 if packet is None:
                     break
             except PacketMalformedError, msg:
                 self.getHandler()._packetMalformed(self, msg)
                 return
             self._timeout.refresh(time())
-            msg = msg[len(packet):]
             packet_type = packet.getType()
             if packet_type == Packets.Ping:
                 # Send a pong notification
@@ -379,7 +374,6 @@
                 # Skip PONG packets, its only purpose is refresh the timeout
                 # generated upong ping.
                 self._queue.append(packet)
-        self.read_buf = [msg]
 
     def hasPendingMessages(self):
         """

Modified: trunk/neo/protocol.py
==============================================================================
--- trunk/neo/protocol.py [iso-8859-1] (original)
+++ trunk/neo/protocol.py [iso-8859-1] Fri Mar 26 10:36:38 2010
@@ -1525,11 +1525,12 @@
         # load packet classes
         self.update(StaticRegistry)
 
-    def parse(self, msg):
-        if len(msg) < MIN_PACKET_SIZE:
+    def parse(self, buf):
+        if len(buf) < PACKET_HEADER_SIZE:
             return None
-        msg_id, msg_type, msg_len = unpack(PACKET_HEADER_FORMAT,
-            msg[:PACKET_HEADER_SIZE])
+        header = buf.peek(PACKET_HEADER_SIZE)
+        assert header is not None
+        msg_id, msg_type, msg_len = unpack(PACKET_HEADER_FORMAT, header)
         try:
             packet_klass = self[msg_type]
         except KeyError:
@@ -1538,11 +1539,15 @@
             raise PacketMalformedError('message too big (%d)' % msg_len)
         if msg_len < MIN_PACKET_SIZE:
             raise PacketMalformedError('message too small (%d)' % msg_len)
-        if len(msg) < msg_len:
+        if len(buf) < msg_len:
             # Not enough.
             return None
+        buf.skip(PACKET_HEADER_SIZE)
+        msg_len -= PACKET_HEADER_SIZE
         packet = packet_klass()
-        packet.setContent(msg_id, msg[PACKET_HEADER_SIZE:msg_len])
+        data = buf.read(msg_len)
+        assert data is not None
+        packet.setContent(msg_id, data)
         return packet
 
     # packets registration

Modified: trunk/neo/tests/testConnection.py
==============================================================================
--- trunk/neo/tests/testConnection.py [iso-8859-1] (original)
+++ trunk/neo/tests/testConnection.py [iso-8859-1] Fri Mar 26 10:36:38 2010
@@ -128,7 +128,15 @@
         self.assertEquals(len(calls), n)
 
     def _checkReadBuf(self, bc, data):
-        self.assertEqual(''.join(bc.read_buf), data)
+        content = bc.read_buf.peek(len(bc.read_buf))
+        self.assertEqual(''.join(content), data)
+
+    def _appendToReadBuf(self, bc, data):
+        bc.read_buf.append(data)
+
+    def _appendPacketToReadBuf(self, bc, packet):
+        data = ''.join(packet.encode())
+        bc.read_buf.append(data)
 
     def _checkWriteBuf(self, bc, data):
         self.assertEqual(''.join(bc.write_buf), data)
@@ -392,7 +400,7 @@
                 (("127.0.0.1", 2132), self.getNewUUID()))
         p = Packets.AnswerPrimary(self.getNewUUID(), master_list)
         p.setId(1)
-        bc.read_buf += p.encode()
+        self._appendPacketToReadBuf(bc, p)
         bc.analyse()
         # check packet decoded
         self.assertEquals(len(bc._queue.mockGetNamedCalls("append")), 1)
@@ -419,7 +427,7 @@
                 (("127.0.0.1", 2132), self.getNewUUID()))
         p1 = Packets.AnswerPrimary(self.getNewUUID(), master_list)
         p1.setId(1)
-        bc.read_buf += p1.encode()
+        self._appendPacketToReadBuf(bc, p1)
         # packet 2
         master_list = (
                 (("127.0.0.1", 2135), self.getNewUUID()),
@@ -432,8 +440,8 @@
                 (("127.0.0.1", 2132), self.getNewUUID()))
         p2 = Packets.AnswerPrimary( self.getNewUUID(), master_list)
         p2.setId(2)
-        bc.read_buf += p2.encode()
-        self.assertEqual(len(''.join(bc.read_buf)), len(p1) + len(p2))
+        self._appendPacketToReadBuf(bc, p2)
+        self.assertEqual(len(bc.read_buf), len(p1) + len(p2))
         bc.analyse()
         # check two packets decoded
         self.assertEquals(len(bc._queue.mockGetNamedCalls("append")), 2)
@@ -455,7 +463,7 @@
         # give a bad packet, won't be decoded
         bc = self._makeConnection()
         bc._queue = Mock()
-        bc.read_buf += "datadatadatadata"
+        self._appendToReadBuf(bc, 'datadatadatadata')
         self.assertEqual(len(bc.read_buf), 16)
         bc.analyse()
         self.assertEqual(len(bc.read_buf), 16)
@@ -476,7 +484,7 @@
                 (("127.0.0.1", 2132), self.getNewUUID()))
         p = Packets.AnswerPrimary(self.getNewUUID(), master_list)
         p.setId(1)
-        bc.read_buf += p.encode()
+        self._appendPacketToReadBuf(bc, p)
         bc.analyse()
         # check packet decoded
         self.assertEquals(len(bc._queue.mockGetNamedCalls("append")), 1)
@@ -485,7 +493,7 @@
         self.assertEqual(data.getType(), p.getType())
         self.assertEqual(data.getId(), p.getId())
         self.assertEqual(data.decode(), p.decode())
-        self.assertEqual(''.join(bc.read_buf), '')
+        self._checkReadBuf(bc, '')
 
     def test_Connection_writable1(self):
         # with  pending operation after send

Modified: trunk/neo/tests/testUtil.py
==============================================================================
--- trunk/neo/tests/testUtil.py [iso-8859-1] (original)
+++ trunk/neo/tests/testUtil.py [iso-8859-1] Fri Mar 26 10:36:38 2010
@@ -18,11 +18,51 @@
 import unittest
 
 from neo.tests import NeoTestBase
-from neo import util
+from neo.util import ReadBuffer
 
 class UtilTests(NeoTestBase):
 
-    pass
+    def testReadBufferRead(self):
+        """ Append some chunk then consume the data """
+        buf = ReadBuffer()
+        self.assertEqual(len(buf), 0)
+        buf.append('abc')
+        self.assertEqual(len(buf), 3)
+        # no enough data
+        self.assertEqual(buf.read(4), None)
+        self.assertEqual(len(buf), 3)
+        buf.append('def')
+        # consume a part
+        self.assertEqual(len(buf), 6)
+        self.assertEqual(buf.read(4), 'abcd')
+        self.assertEqual(len(buf), 2)
+        # consume the rest
+        self.assertEqual(buf.read(3), None)
+        self.assertEqual(buf.read(2), 'ef')
+
+    def testReadBufferPeek(self):
+        buf = ReadBuffer()
+        self.assertEqual(len(buf), 0)
+        buf.append('abc')
+        self.assertEqual(len(buf), 3)
+        # peek some data
+        self.assertEqual(buf.peek(3), 'abc')
+        self.assertEqual(buf.peek(5), None) # not enough
+        buf.append('def')
+        self.assertEqual(len(buf), 6)
+        self.assertEqual(buf.peek(3), 'abc') # no change
+        self.assertEqual(buf.peek(6), 'abcdef')
+        self.assertEqual(buf.peek(7), None)
+
+    def testReadBufferSkip(self):
+        buf = ReadBuffer()
+        self.assertEqual(len(buf), 0)
+        buf.append('abc')
+        self.assertEqual(len(buf), 3)
+        buf.skip(1)
+        self.assertEqual(len(buf), 2)
+        buf.skip(3) # eat all
+        self.assertEqual(len(buf), 0)
 
 if __name__ == "__main__":
     unittest.main()

Modified: trunk/neo/util.py
==============================================================================
--- trunk/neo/util.py [iso-8859-1] (original)
+++ trunk/neo/util.py [iso-8859-1] Fri Mar 26 10:36:38 2010
@@ -19,6 +19,7 @@
 import re
 import socket
 from zlib import adler32
+from Queue import deque
 from struct import pack, unpack
 
 def u64(s):
@@ -130,3 +131,83 @@
     def getByName(self, name):
         return getattr(self, name)
 
+
+class ReadBuffer(object):
+    """
+        Implementation of a lazy buffer. Main purpose if to reduce useless
+        copies of data by storing chunks and join them only when the requested
+        size is available.
+    """
+
+    def __init__(self):
+        self.size = 0
+        self.content = deque()
+
+    def append(self, data):
+        """ Append some data and compute the new buffer size """
+        size = len(data)
+        self.size += size
+        self.content.append((size, data))
+
+    def __len__(self):
+        """ Return the current buffer size """
+        return self.size
+
+    def _read(self, size):
+        """ Join all required chunks to build a string of requested size """
+        chunk_list = []
+        pop_chunk = self.content.popleft
+        append_data = chunk_list.append
+        # select required chunks
+        while size > 0:
+            chunk_size, chunk_data = pop_chunk()
+            size -= chunk_size
+            append_data(chunk_data)
+        if size < 0:
+            # too many bytes consumed, cut the last chunk
+            last_chunk = chunk_list[-1]
+            keep, let = last_chunk[:size], last_chunk[size:]
+            self.content.appendleft((-size, let))
+            chunk_list[-1] = keep
+        # join all chunks (one copy)
+        return ''.join(chunk_list)
+
+    def skip(self, size):
+        """ Skip at most size bytes """
+        if self.size <= size:
+            self.size = 0
+            self.content.clear()
+            return
+        pop_chunk = self.content.popleft
+        self.size -= size
+        # skip chunks
+        while size > 0:
+            chunk_size, last_chunk = pop_chunk()
+            size -= chunk_size
+        if size < 0:
+            # but keep a part of the last one if needed
+            self.content.append((-size, last_chunk[size:]))
+
+    def peek(self, size):
+        """ Read size bytes but don't consume """
+        if self.size < size:
+            return None
+        data = self._read(size)
+        self.content.appendleft((size, data))
+        assert len(data) == size
+        return data
+
+    def read(self, size):
+        """ Read and consume size bytes """
+        if self.size < size:
+            return None
+        self.size -= size
+        data = self._read(size)
+        assert len(data) == size
+        return data
+
+    def clear(self):
+        """ Erase all buffer content """
+        self.size = 0
+        self.content.clear()
+





More information about the Neo-report mailing list