[Neo-report] r2419 vincent - in /trunk/neo: ./ master/handlers/ storage/handlers/
nobody at svn.erp5.org
nobody at svn.erp5.org
Fri Nov 5 18:43:26 CET 2010
Author: vincent
Date: Fri Nov 5 18:43:26 2010
New Revision: 2419
Log:
Ignore some requests, based on connection state.
Some requests can be safely ignored when received over a closed connection.
This was previously done explicitly in handlers, but it turns out it would
cause a lot of code duplication. Instead, define the policy on a packet
type basis, and apply it to all packets upon reception, before passing it
to handler.
Also, protect request handlers when they respond, as connection might be
closed.
Modified:
trunk/neo/connection.py
trunk/neo/master/handlers/__init__.py
trunk/neo/master/handlers/election.py
trunk/neo/master/handlers/storage.py
trunk/neo/protocol.py
trunk/neo/storage/handlers/master.py
Modified: trunk/neo/connection.py
==============================================================================
--- trunk/neo/connection.py [iso-8859-1] (original)
+++ trunk/neo/connection.py [iso-8859-1] Fri Nov 5 18:43:26 2010
@@ -162,6 +162,10 @@ class HandlerSwitcher(object):
def _handle(self, connection, packet):
assert len(self._pending) == 1 or self._pending[0][0]
PACKET_LOGGER.dispatch(connection, packet, 'from')
+ if connection.isClosed() and packet.ignoreOnClosedConnection():
+ neo.logging.debug('Ignoring packet %r on closed connection %r',
+ packet, connection)
+ return
msg_id = packet.getId()
(request_dict, handler) = self._pending[0]
# notifications are not expected
Modified: trunk/neo/master/handlers/__init__.py
==============================================================================
--- trunk/neo/master/handlers/__init__.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/__init__.py [iso-8859-1] Fri Nov 5 18:43:26 2010
@@ -28,13 +28,6 @@ class MasterHandler(EventHandler):
neo.logging.error('Protocol error %s %s', message, conn.getAddress())
def askPrimary(self, conn):
- if conn.getConnector() is None:
- # Connection can be closed by peer after he sent AskPrimary
- # if he finds the primary master before we answer him.
- # The connection gets closed before this message gets processed
- # because this message might have been queued, but connection
- # interruption takes effect as soon as received.
- return
app = self.app
if app.primary:
primary_uuid = app.uuid
Modified: trunk/neo/master/handlers/election.py
==============================================================================
--- trunk/neo/master/handlers/election.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/election.py [iso-8859-1] Fri Nov 5 18:43:26 2010
@@ -193,14 +193,6 @@ class ServerElectionHandler(MasterHandle
def requestIdentification(self, conn, node_type,
uuid, address, name):
- if conn.getConnector() is None:
- # Connection can be closed by peer after he sent
- # RequestIdentification if he finds the primary master before
- # we answer him.
- # The connection gets closed before this message gets processed
- # because this message might have been queued, but connection
- # interruption takes effect as soon as received.
- return
self.checkClusterName(name)
app = self.app
if node_type != NodeTypes.MASTER:
Modified: trunk/neo/master/handlers/storage.py
==============================================================================
--- trunk/neo/master/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/storage.py [iso-8859-1] Fri Nov 5 18:43:26 2010
@@ -140,10 +140,8 @@ class StorageServiceHandler(BaseServiceH
uid_set.remove(conn.getUUID())
if not uid_set:
app.packing = None
- try:
+ if not client.isClosed():
client.answer(Packets.AnswerPack(True), msg_id=msg_id)
- except ConnectorConnectionClosedException:
- pass
def notifyReady(self, conn):
self.app.setStorageReady(conn.getUUID())
Modified: trunk/neo/protocol.py
==============================================================================
--- trunk/neo/protocol.py [iso-8859-1] (original)
+++ trunk/neo/protocol.py [iso-8859-1] Fri Nov 5 18:43:26 2010
@@ -239,6 +239,7 @@ class Packet(object):
a tuple respectively.
"""
+ _ignore_when_closed = False
_header_format = None
_header_len = None
_request = None
@@ -326,6 +327,13 @@ class Packet(object):
def getAnswerClass(self):
return self._answer
+ def ignoreOnClosedConnection(self):
+ """
+ Tells if this packet must be ignored when its connection is closed
+ when it is handled.
+ """
+ return self._ignore_when_closed
+
class Notify(Packet):
"""
General purpose notification (remote logging)
@@ -1718,7 +1726,7 @@ def initMessage(klass):
klass._header_len = calcsize(klass._header_format)
StaticRegistry = {}
-def register(code, request, answer=None):
+def register(code, request, answer=None, ignore_when_closed=None):
""" Register a packet in the packet registry """
# register the request
# assert code & RESPONSE_MASK == 0
@@ -1727,6 +1735,13 @@ def register(code, request, answer=None)
request._code = code
request._answer = answer
StaticRegistry[code] = request
+ if ignore_when_closed is None:
+ # By default, on a closed connection:
+ # - request: ignore
+ # - answer: keep
+ # - nofitication: keep
+ ignore_when_closed = answer is not None
+ request._ignore_when_closed = ignore_when_closed
if answer not in (None, Error):
initMessage(answer)
# compute the answer code
@@ -1842,11 +1857,15 @@ class PacketRegistry(dict):
AskFinishTransaction, AnswerTransactionFinished = register(
0x0013,
AskFinishTransaction,
- AnswerTransactionFinished)
+ AnswerTransactionFinished,
+ ignore_when_closed=False,
+ )
AskLockInformation, AnswerInformationLocked = register(
0x0014,
AskLockInformation,
- AnswerInformationLocked)
+ AnswerInformationLocked,
+ ignore_when_closed=False,
+ )
InvalidateObjects = register(0x0015, InvalidateObjects)
NotifyUnlockInformation = register(0x0016, NotifyUnlockInformation)
AskNewOIDs, AnswerNewOIDs = register(
@@ -1889,11 +1908,15 @@ class PacketRegistry(dict):
SetNodeState = register(
0x0023,
SetNodeState,
- Error)
+ Error,
+ ignore_when_closed=False,
+ )
AddPendingNodes = register(
0x0024,
AddPendingNodes,
- Error)
+ Error,
+ ignore_when_closed=False,
+ )
AskNodeInformation, AnswerNodeInformation = register(
0x0025,
AskNodeInformation,
@@ -1901,7 +1924,9 @@ class PacketRegistry(dict):
SetClusterState = register(
0x0026,
SetClusterState,
- Error)
+ Error,
+ ignore_when_closed=False,
+ )
NotifyClusterInformation = register(0x0027, NotifyClusterInformation)
AskClusterState, AnswerClusterState = register(
0x0028,
@@ -1932,7 +1957,9 @@ class PacketRegistry(dict):
AskPack, AnswerPack = register(
0x0038,
AskPack,
- AnswerPack)
+ AnswerPack,
+ ignore_when_closed=False,
+ )
AskCheckTIDRange, AnswerCheckTIDRange = register(
0x0039,
AskCheckTIDRange,
Modified: trunk/neo/storage/handlers/master.py
==============================================================================
--- trunk/neo/storage/handlers/master.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/master.py [iso-8859-1] Fri Nov 5 18:43:26 2010
@@ -57,7 +57,8 @@ class MasterOperationHandler(BaseMasterH
if not tid in self.app.tm:
raise ProtocolError('Unknown transaction')
self.app.tm.lock(tid, oid_list)
- conn.answer(Packets.AnswerInformationLocked(tid))
+ if not conn.isClosed():
+ conn.answer(Packets.AnswerInformationLocked(tid))
def notifyUnlockInformation(self, conn, tid):
if not tid in self.app.tm:
@@ -70,5 +71,6 @@ class MasterOperationHandler(BaseMasterH
neo.logging.info('Pack started, up to %s...', dump(tid))
app.dm.pack(tid, app.tm.updateObjectDataForPack)
neo.logging.info('Pack finished.')
- conn.answer(Packets.AnswerPack(True))
+ if not conn.isClosed():
+ conn.answer(Packets.AnswerPack(True))
More information about the Neo-report
mailing list