[Neo-report] r1874 gregory - in /trunk/neo: connection.py protocol.py
nobody at svn.erp5.org
nobody at svn.erp5.org
Mon Mar 1 11:54:46 CET 2010
Author: gregory
Date: Mon Mar 1 11:54:45 2010
New Revision: 1874
Log:
Initial implementation of HandlerSwitcher for connections.
The main purpose is to apply and handler after all pending requests are
satisfied and allow process answer packets of current state out of order.
This fix a bug that appears when a storage tries to get a lock on an object that
is already held by a previous transaction. Is this case the store is delayed
causing the answer packet be sent out of order (packet sequence breakage), so
unexpected from client's point of view.
Modified:
trunk/neo/connection.py
trunk/neo/protocol.py
Modified: trunk/neo/connection.py
==============================================================================
--- trunk/neo/connection.py [iso-8859-1] (original)
+++ trunk/neo/connection.py [iso-8859-1] Mon Mar 1 11:54:45 2010
@@ -15,8 +15,6 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-from Queue import deque
-
from neo import logging
from neo.locking import RLock
@@ -63,6 +61,65 @@
return wrapper
+class HandlerSwitcher(object):
+
+ def __init__(self, connection, handler):
+ self._connection = connection
+ # pending handlers and related requests
+ self._pending = [[{}, handler]]
+
+ def clear(self):
+ handler = self._pending[0][1]
+ self._pending = [[{}, handler]]
+
+ def isPending(self):
+ return self._pending[0][0]
+
+ def getHandler(self):
+ return self._pending[0][1]
+
+ def emit(self, request):
+ # register the request in the current handler
+ assert len(self._pending) == 1 or self._pending[0][0]
+ (request_dict, _) = self._pending[-1]
+ msg_id = request.getId()
+ assert request.getAnswerClass() is not None, "Not a request"
+ assert msg_id not in request_dict, "Packet id already expected"
+ request_dict[msg_id] = request.getAnswerClass()
+
+ def handle(self, packet):
+ assert len(self._pending) == 1 or self._pending[0][0]
+ PACKET_LOGGER.dispatch(self._connection, packet, 'from')
+ msg_id = packet.getId()
+ (request_dict, handler) = self._pending[0]
+ # notifications are not expected
+ if not packet.isResponse():
+ handler.packetReceived(self._connection, packet)
+ return
+ # checkout the expected answer class
+ klass = request_dict.pop(msg_id, None)
+ if klass and isinstance(packet, klass) or packet.isError():
+ handler.packetReceived(self._connection, packet)
+ # apply a pending handler if no more answers are pending
+ if len(self._pending) > 1 and not request_dict:
+ del self._pending[0]
+ logging.debug('Apply handler %r', self._pending[0][1])
+ else:
+ logging.error('Unexpected answer: %r', packet)
+ self._connection.abort()
+ handler.peerBroken()
+
+ def setHandler(self, handler):
+ if len(self._pending) == 1 and not self._pending[0][0]:
+ # nothing is pending, change immediately
+ logging.debug('Set handler %r', handler)
+ self._pending[0][1] = handler
+ else:
+ # put the next handler in queue
+ logging.debug('Delay handler %r', handler)
+ self._pending.append([{}, handler])
+
+
class BaseConnection(object):
"""A base connection."""
@@ -71,7 +128,7 @@
self.em = event_manager
self.connector = connector
self.addr = addr
- self.handler = handler
+ self._handlers = HandlerSwitcher(self, handler)
if connector is not None:
self.connector_handler = connector.__class__
event_manager.register(self)
@@ -117,10 +174,10 @@
__del__ = close
def getHandler(self):
- return self.handler
+ return self._handlers.getHandler()
def setHandler(self, handler):
- self.handler = handler
+ self._handlers.setHandler(handler)
def getEventManager(self):
return self.em
@@ -141,9 +198,6 @@
return False
def hasPendingMessages(self):
- return False
-
- def hasPendingRequests(self):
return False
def whoSetConnector(self):
@@ -200,25 +254,12 @@
self.aborted = False
self.uuid = None
self._queue = []
- self._expected = deque()
- self._next_handler = None
self._on_close = None
BaseConnection.__init__(self, event_manager, handler,
connector = connector, addr = addr,
connector_handler = connector_handler)
if connector is not None:
event_manager.addReader(self)
-
- def setHandler(self, handler):
- assert self._next_handler is None
- if self.hasPendingRequests():
- logging.debug('Delaying: %s -> %s after %s',
- self.handler.__class__.__name__, handler.__class__.__name__,
- list(self._expected))
- self._expected.append(APPLY_HANDLER)
- self._next_handler = handler
- else:
- self.handler = handler
def setOnClose(self, callback):
assert self._on_close is None
@@ -256,7 +297,7 @@
self.event_dict.clear()
self.write_buf = ""
self.read_buf = ""
- self._expected.clear()
+ self._handlers.clear()
def abort(self):
"""Abort dealing with this connection."""
@@ -290,7 +331,7 @@
if packet is None:
break
except PacketMalformedError, msg:
- self.handler._packetMalformed(self, msg)
+ self.getHandler()._packetMalformed(self, msg)
return
self.read_buf = self.read_buf[len(packet):]
@@ -319,39 +360,13 @@
"""
return len(self._queue) != 0
- def hasPendingRequests(self):
- """
- Returns True if there are pending expected answer packets
- """
- return bool(self._expected)
-
def process(self):
"""
Process a pending packet.
"""
- # check out packet and check if it's and expected answer
+ # check out packet and process it with current handler
packet = self._queue.pop(0)
- if packet.isResponse():
- request = None
- if self._expected:
- request = self._expected.popleft()
- if request is APPLY_HANDLER:
- logging.debug('Apply handler %s',
- self._next_handler.__class__.__name__)
- self.handler = self._next_handler
- self._next_handler = None
- return
- if not request or not request.answerMatch(packet):
- req_info = ('', '')
- if request is not None:
- req_info = (request.getId(), request.__class__)
- rep_info = (packet.getId(), packet.__class__)
- logging.warning('Unexpected answer: %s:%s %s:%s' %
- (rep_info + req_info))
-
- # process packet
- PACKET_LOGGER.dispatch(self, packet, 'from')
- self.handler.packetReceived(self, packet)
+ self._handlers.handle(packet)
def pending(self):
return self.connector is not None and self.write_buf
@@ -359,7 +374,7 @@
def _closure(self):
assert self.connector is not None, self.whoSetConnector()
self.close()
- self.handler.connectionClosed(self)
+ self.getHandler().connectionClosed(self)
def _recv(self):
"""Receive data from a connector."""
@@ -375,7 +390,7 @@
except ConnectorConnectionRefusedException:
# should only occur while connecting
self.close()
- self.handler.connectionFailed(self)
+ self.getHandler().connectionFailed(self)
except ConnectorConnectionClosedException:
# connection resetted by peer, according to the man, this error
# should not occurs but it seems it's false
@@ -469,8 +484,7 @@
self.expectMessage(msg_id, timeout=timeout,
additional_timeout=additional_timeout)
self._addPacket(packet)
- assert packet.getAnswer() is not None, packet
- self._expected.append(packet)
+ self._handlers.emit(packet)
return msg_id
@not_closed
@@ -509,7 +523,7 @@
event_manager.addWriter(self)
else:
self.connecting = False
- self.handler.connectionCompleted(self)
+ self.getHandler().connectionCompleted(self)
event_manager.addReader(self)
except ConnectorConnectionRefusedException:
handler.connectionFailed(self)
@@ -525,12 +539,12 @@
if self.connecting:
err = self.connector.getError()
if err:
- self.handler.connectionFailed(self)
+ self.getHandler().connectionFailed(self)
self.close()
return
else:
self.connecting = False
- self.handler.connectionCompleted(self)
+ self.getHandler().connectionCompleted(self)
self.em.addReader(self)
else:
Connection.writable(self)
@@ -594,8 +608,7 @@
self.dispatcher.register(self, msg_id, queue)
self.expectMessage(msg_id)
self._addPacket(packet)
- assert packet.getAnswer() is not None, packet
- self._expected.append(packet)
+ self._handlers.emit(packet)
return msg_id
@lockCheckWrapper
Modified: trunk/neo/protocol.py
==============================================================================
--- trunk/neo/protocol.py [iso-8859-1] (original)
+++ trunk/neo/protocol.py [iso-8859-1] Mon Mar 1 11:54:45 2010
@@ -294,16 +294,13 @@
assert body == '', "Non-empty packet decoding not implemented """
return ()
+ def isError(self):
+ return isinstance(self, Error)
+
def isResponse(self):
return self._code & RESPONSE_MASK == RESPONSE_MASK
- def answerMatch(self, answer):
- id_match = self._id == answer._id
- is_error = answer.__class__ == Error
- assert self._answer is not None
- return id_match and (is_error or isinstance(answer, self._answer))
-
- def getAnswer(self):
+ def getAnswerClass(self):
return self._answer
More information about the Neo-report
mailing list