[Neo-report] r2357 gregory - in /trunk/neo: ./ client/
nobody at svn.erp5.org
nobody at svn.erp5.org
Wed Oct 27 18:24:42 CEST 2010
Author: gregory
Date: Wed Oct 27 18:24:40 2010
New Revision: 2357
Log:
Check if a MT connection is closed in ask()
This solve a race condition where the client's thread poll triggers
connection(Lost|Failed) before a call to ask(). In that case a answer was
registered as expected and never cancelled, which lead to a frozen client.
- Split _connectToPrimaryNode
- Move ConnectionClosed at generic level
Modified:
trunk/neo/client/app.py
trunk/neo/client/exception.py
trunk/neo/client/pool.py
trunk/neo/connection.py
Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Wed Oct 27 18:24:40 2010
@@ -34,11 +34,11 @@ from neo.protocol import NodeTypes, Pack
from neo.event import EventManager
from neo.util import makeChecksum as real_makeChecksum, dump
from neo.locking import Lock
-from neo.connection import MTClientConnection, OnTimeout
+from neo.connection import MTClientConnection, OnTimeout, ConnectionClosed
from neo.node import NodeManager
from neo.connector import getConnectorHandler
from neo.client.exception import NEOStorageError, NEOStorageCreationUndoneError
-from neo.client.exception import NEOStorageNotFoundError, ConnectionClosed
+from neo.client.exception import NEOStorageNotFoundError
from neo.exception import NeoException
from neo.client.handlers import storage, master
from neo.dispatcher import Dispatcher, ForgottenPacket
@@ -314,6 +314,9 @@ class Application(object):
@profiler_decorator
def _connectToPrimaryNode(self):
+ """
+ Lookup for the current primary master node
+ """
logging.debug('connecting to primary master...')
ready = False
nm = self.nm
@@ -349,8 +352,8 @@ class Application(object):
logging.error('Connection to master node %s failed',
self.trying_master_node)
continue
- msg_id = conn.ask(Packets.AskPrimary(), queue=queue)
try:
+ msg_id = conn.ask(Packets.AskPrimary(), queue=queue)
self._waitMessage(conn, msg_id,
handler=self.primary_bootstrap_handler)
except ConnectionClosed:
@@ -358,40 +361,44 @@ class Application(object):
# If we reached the primary master node, mark as connected
connected = self.primary_master_node is not None and \
self.primary_master_node is self.trying_master_node
-
- logging.info('connected to a primary master node')
- # Identify to primary master and request initial data
- while conn.getUUID() is None:
- if conn.getConnector() is None:
- logging.error('Connection to master node %s lost',
- self.trying_master_node)
- self.primary_master_node = None
- break
- p = Packets.RequestIdentification(NodeTypes.CLIENT,
- self.uuid, None, self.name)
- msg_id = conn.ask(p, queue=queue)
- try:
- self._waitMessage(conn, msg_id,
- handler=self.primary_bootstrap_handler)
- except ConnectionClosed:
- self.primary_master_node = None
- break
- if conn.getUUID() is None:
- # Node identification was refused by master.
- time.sleep(1)
- if self.uuid is not None:
- msg_id = conn.ask(Packets.AskNodeInformation(), queue=queue)
- self._waitMessage(conn, msg_id,
- handler=self.primary_bootstrap_handler)
- msg_id = conn.ask(Packets.AskPartitionTable(), queue=queue)
- self._waitMessage(conn, msg_id,
- handler=self.primary_bootstrap_handler)
- ready = self.uuid is not None and self.pt is not None \
- and self.pt.operational()
- logging.info("connected to primary master node %s" %
- self.primary_master_node)
+ logging.info('Connected to %s' % (self.primary_master_node, ))
+ try:
+ ready = self.identifyToPrimaryNode(conn)
+ except ConnectionClosed:
+ logging.error('Connection to %s lost', self.trying_master_node)
+ self.primary_master_node = None
+ continue
+ logging.info("Connected and ready")
return conn
+ def identifyToPrimaryNode(self, conn):
+ """
+ Request identification and required informations to be operational.
+ Might raise ConnectionClosed so that the new primary can be
+ looked-up again.
+ """
+ logging.info('Initializing from master')
+ queue = self.local_var.queue
+ # Identify to primary master and request initial data
+ while conn.getUUID() is None:
+ p = Packets.RequestIdentification(NodeTypes.CLIENT, self.uuid,
+ None, self.name)
+ self._waitMessage(conn, conn.ask(p, queue=queue),
+ handler=self.primary_bootstrap_handler)
+ if conn.getUUID() is None:
+ # Node identification was refused by master, it is considered
+ # as the primary as long as we are connected to it.
+ time.sleep(1)
+ if self.uuid is not None:
+ msg_id = conn.ask(Packets.AskNodeInformation(), queue=queue)
+ self._waitMessage(conn, msg_id,
+ handler=self.primary_bootstrap_handler)
+ msg_id = conn.ask(Packets.AskPartitionTable(), queue=queue)
+ self._waitMessage(conn, msg_id,
+ handler=self.primary_bootstrap_handler)
+ return self.uuid is not None and self.pt is not None \
+ and self.pt.operational()
+
def registerDB(self, db, limit):
self._db = db
Modified: trunk/neo/client/exception.py
==============================================================================
--- trunk/neo/client/exception.py [iso-8859-1] (original)
+++ trunk/neo/client/exception.py [iso-8859-1] Wed Oct 27 18:24:40 2010
@@ -17,9 +17,6 @@
from ZODB import POSException
-class ConnectionClosed(Exception):
- pass
-
class NEOStorageError(POSException.StorageError):
pass
Modified: trunk/neo/client/pool.py
==============================================================================
--- trunk/neo/client/pool.py [iso-8859-1] (original)
+++ trunk/neo/client/pool.py [iso-8859-1] Wed Oct 27 18:24:40 2010
@@ -19,8 +19,7 @@
from neo import logging
from neo.locking import RLock
from neo.protocol import NodeTypes, Packets
-from neo.connection import MTClientConnection
-from neo.client.exception import ConnectionClosed
+from neo.connection import MTClientConnection, ConnectionClosed
from neo.profiling import profiler_decorator
import time
Modified: trunk/neo/connection.py
==============================================================================
--- trunk/neo/connection.py [iso-8859-1] (original)
+++ trunk/neo/connection.py [iso-8859-1] Wed Oct 27 18:24:40 2010
@@ -36,6 +36,9 @@ PING_TIMEOUT = 5
INCOMING_TIMEOUT = 10
CRITICAL_TIMEOUT = 30
+class ConnectionClosed(Exception):
+ pass
+
def not_closed(func):
def decorator(self, *args, **kw):
if self.connector is None:
@@ -734,6 +737,8 @@ class MTClientConnection(ClientConnectio
queue=None):
self.lock()
try:
+ if self.isClosed():
+ raise ConnectionClosed
# XXX: Here, we duplicate Connection.ask because we need to call
# self.dispatcher.register after setId is called and before
# _addPacket is called.
More information about the Neo-report
mailing list