[Neo-report] r1874 gregory - in /trunk/neo: connection.py protocol.py

nobody at svn.erp5.org nobody at svn.erp5.org
Mon Mar 1 11:54:46 CET 2010


Author: gregory
Date: Mon Mar  1 11:54:45 2010
New Revision: 1874

Log:
Initial implementation of HandlerSwitcher for connections.

The main purpose is to apply and handler after all pending requests are
satisfied and allow process answer packets of current state out of order.
This fix a bug that appears when a storage tries to get a lock on an object that
is already held by a previous transaction. Is this case the store is delayed
causing the answer packet be sent out of order (packet sequence breakage), so
unexpected from client's point of view.

Modified:
    trunk/neo/connection.py
    trunk/neo/protocol.py

Modified: trunk/neo/connection.py
==============================================================================
--- trunk/neo/connection.py [iso-8859-1] (original)
+++ trunk/neo/connection.py [iso-8859-1] Mon Mar  1 11:54:45 2010
@@ -15,8 +15,6 @@
 # along with this program; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 
-from Queue import deque
-
 from neo import logging
 from neo.locking import RLock
 
@@ -63,6 +61,65 @@
     return wrapper
 
 
+class HandlerSwitcher(object):
+
+    def __init__(self, connection, handler):
+        self._connection = connection
+        # pending handlers and related requests
+        self._pending = [[{}, handler]]
+
+    def clear(self):
+        handler = self._pending[0][1]
+        self._pending = [[{}, handler]]
+
+    def isPending(self):
+        return self._pending[0][0]
+
+    def getHandler(self):
+        return self._pending[0][1]
+
+    def emit(self, request):
+        # register the request in the current handler
+        assert len(self._pending) == 1 or self._pending[0][0]
+        (request_dict, _) = self._pending[-1]
+        msg_id = request.getId()
+        assert request.getAnswerClass() is not None, "Not a request"
+        assert msg_id not in request_dict, "Packet id already expected"
+        request_dict[msg_id] = request.getAnswerClass()
+
+    def handle(self, packet):
+        assert len(self._pending) == 1 or self._pending[0][0]
+        PACKET_LOGGER.dispatch(self._connection, packet, 'from')
+        msg_id = packet.getId()
+        (request_dict, handler) = self._pending[0]
+        # notifications are not expected
+        if not packet.isResponse():
+            handler.packetReceived(self._connection, packet)
+            return
+        # checkout the expected answer class
+        klass = request_dict.pop(msg_id, None)
+        if klass and isinstance(packet, klass) or packet.isError():
+            handler.packetReceived(self._connection, packet)
+            # apply a pending handler if no more answers are pending
+            if len(self._pending) > 1 and not request_dict:
+                del self._pending[0]
+                logging.debug('Apply handler %r', self._pending[0][1])
+        else:
+            logging.error('Unexpected answer: %r', packet)
+            self._connection.abort()
+            handler.peerBroken()
+
+    def setHandler(self, handler):
+        if len(self._pending) == 1 and not self._pending[0][0]:
+            # nothing is pending, change immediately
+            logging.debug('Set handler %r', handler)
+            self._pending[0][1] = handler
+        else:
+            # put the next handler in queue
+            logging.debug('Delay handler %r', handler)
+            self._pending.append([{}, handler])
+
+
 class BaseConnection(object):
     """A base connection."""
 
@@ -71,7 +128,7 @@
         self.em = event_manager
         self.connector = connector
         self.addr = addr
-        self.handler = handler
+        self._handlers = HandlerSwitcher(self, handler)
         if connector is not None:
             self.connector_handler = connector.__class__
             event_manager.register(self)
@@ -117,10 +174,10 @@
     __del__ = close
 
     def getHandler(self):
-        return self.handler
+        return self._handlers.getHandler()
 
     def setHandler(self, handler):
-        self.handler = handler
+        self._handlers.setHandler(handler)
 
     def getEventManager(self):
         return self.em
@@ -141,9 +198,6 @@
         return False
 
     def hasPendingMessages(self):
-        return False
-
-    def hasPendingRequests(self):
         return False
 
     def whoSetConnector(self):
@@ -200,25 +254,12 @@
         self.aborted = False
         self.uuid = None
         self._queue = []
-        self._expected = deque()
-        self._next_handler = None
         self._on_close = None
         BaseConnection.__init__(self, event_manager, handler,
                                 connector = connector, addr = addr,
                                 connector_handler = connector_handler)
         if connector is not None:
             event_manager.addReader(self)
-
-    def setHandler(self, handler):
-        assert self._next_handler is None
-        if self.hasPendingRequests():
-            logging.debug('Delaying: %s -> %s after %s',
-                self.handler.__class__.__name__, handler.__class__.__name__,
-                list(self._expected))
-            self._expected.append(APPLY_HANDLER)
-            self._next_handler = handler
-        else:
-            self.handler = handler
 
     def setOnClose(self, callback):
         assert self._on_close is None
@@ -256,7 +297,7 @@
         self.event_dict.clear()
         self.write_buf = ""
         self.read_buf = ""
-        self._expected.clear()
+        self._handlers.clear()
 
     def abort(self):
         """Abort dealing with this connection."""
@@ -290,7 +331,7 @@
                 if packet is None:
                     break
             except PacketMalformedError, msg:
-                self.handler._packetMalformed(self, msg)
+                self.getHandler()._packetMalformed(self, msg)
                 return
             self.read_buf = self.read_buf[len(packet):]
 
@@ -319,39 +360,13 @@
         """
         return len(self._queue) != 0
 
-    def hasPendingRequests(self):
-        """
-            Returns True if there are pending expected answer packets
-        """
-        return bool(self._expected)
-
     def process(self):
         """
           Process a pending packet.
         """
-        # check out packet and check if it's and expected answer
+        # check out packet and process it with current handler
         packet = self._queue.pop(0)
-        if packet.isResponse():
-            request = None
-            if self._expected:
-                request = self._expected.popleft()
-            if request is APPLY_HANDLER:
-                logging.debug('Apply handler %s',
-                        self._next_handler.__class__.__name__)
-                self.handler = self._next_handler
-                self._next_handler = None
-                return
-            if not request or not request.answerMatch(packet):
-                req_info = ('', '')
-                if request is not None:
-                    req_info = (request.getId(), request.__class__)
-                rep_info = (packet.getId(), packet.__class__)
-                logging.warning('Unexpected answer: %s:%s %s:%s' %
-                        (rep_info + req_info))
-
-        # process packet
-        PACKET_LOGGER.dispatch(self, packet, 'from')
-        self.handler.packetReceived(self, packet)
+        self._handlers.handle(packet)
 
     def pending(self):
         return self.connector is not None and self.write_buf
@@ -359,7 +374,7 @@
     def _closure(self):
         assert self.connector is not None, self.whoSetConnector()
         self.close()
-        self.handler.connectionClosed(self)
+        self.getHandler().connectionClosed(self)
 
     def _recv(self):
         """Receive data from a connector."""
@@ -375,7 +390,7 @@
         except ConnectorConnectionRefusedException:
             # should only occur while connecting
             self.close()
-            self.handler.connectionFailed(self)
+            self.getHandler().connectionFailed(self)
         except ConnectorConnectionClosedException:
             # connection resetted by peer, according to the man, this error
             # should not occurs but it seems it's false
@@ -469,8 +484,7 @@
         self.expectMessage(msg_id, timeout=timeout,
                 additional_timeout=additional_timeout)
         self._addPacket(packet)
-        assert packet.getAnswer() is not None, packet
-        self._expected.append(packet)
+        self._handlers.emit(packet)
         return msg_id
 
     @not_closed
@@ -509,7 +523,7 @@
                 event_manager.addWriter(self)
             else:
                 self.connecting = False
-                self.handler.connectionCompleted(self)
+                self.getHandler().connectionCompleted(self)
                 event_manager.addReader(self)
         except ConnectorConnectionRefusedException:
             handler.connectionFailed(self)
@@ -525,12 +539,12 @@
         if self.connecting:
             err = self.connector.getError()
             if err:
-                self.handler.connectionFailed(self)
+                self.getHandler().connectionFailed(self)
                 self.close()
                 return
             else:
                 self.connecting = False
-                self.handler.connectionCompleted(self)
+                self.getHandler().connectionCompleted(self)
                 self.em.addReader(self)
         else:
             Connection.writable(self)
@@ -594,8 +608,7 @@
         self.dispatcher.register(self, msg_id, queue)
         self.expectMessage(msg_id)
         self._addPacket(packet)
-        assert packet.getAnswer() is not None, packet
-        self._expected.append(packet)
+        self._handlers.emit(packet)
         return msg_id
 
     @lockCheckWrapper

Modified: trunk/neo/protocol.py
==============================================================================
--- trunk/neo/protocol.py [iso-8859-1] (original)
+++ trunk/neo/protocol.py [iso-8859-1] Mon Mar  1 11:54:45 2010
@@ -294,16 +294,13 @@
         assert body == '', "Non-empty packet decoding not implemented """
         return ()
 
+    def isError(self):
+        return isinstance(self, Error)
+
     def isResponse(self):
         return self._code & RESPONSE_MASK == RESPONSE_MASK
 
-    def answerMatch(self, answer):
-        id_match = self._id == answer._id
-        is_error = answer.__class__ == Error
-        assert self._answer is not None
-        return id_match and (is_error or isinstance(answer, self._answer))
-
-    def getAnswer(self):
+    def getAnswerClass(self):
         return self._answer
 
 





More information about the Neo-report mailing list