[Neo-report] r2357 gregory - in /trunk/neo: ./ client/

nobody at svn.erp5.org nobody at svn.erp5.org
Wed Oct 27 18:24:42 CEST 2010


Author: gregory
Date: Wed Oct 27 18:24:40 2010
New Revision: 2357

Log:
Check if a MT connection is closed in ask()

This solve a race condition where the client's thread poll triggers
connection(Lost|Failed) before a call to ask(). In that case a answer was
registered as expected and never cancelled, which lead to a frozen client.

- Split _connectToPrimaryNode
- Move ConnectionClosed at generic level

Modified:
    trunk/neo/client/app.py
    trunk/neo/client/exception.py
    trunk/neo/client/pool.py
    trunk/neo/connection.py

Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Wed Oct 27 18:24:40 2010
@@ -34,11 +34,11 @@ from neo.protocol import NodeTypes, Pack
 from neo.event import EventManager
 from neo.util import makeChecksum as real_makeChecksum, dump
 from neo.locking import Lock
-from neo.connection import MTClientConnection, OnTimeout
+from neo.connection import MTClientConnection, OnTimeout, ConnectionClosed
 from neo.node import NodeManager
 from neo.connector import getConnectorHandler
 from neo.client.exception import NEOStorageError, NEOStorageCreationUndoneError
-from neo.client.exception import NEOStorageNotFoundError, ConnectionClosed
+from neo.client.exception import NEOStorageNotFoundError
 from neo.exception import NeoException
 from neo.client.handlers import storage, master
 from neo.dispatcher import Dispatcher, ForgottenPacket
@@ -314,6 +314,9 @@ class Application(object):
 
     @profiler_decorator
     def _connectToPrimaryNode(self):
+        """
+            Lookup for the current primary master node
+        """
         logging.debug('connecting to primary master...')
         ready = False
         nm = self.nm
@@ -349,8 +352,8 @@ class Application(object):
                     logging.error('Connection to master node %s failed',
                                   self.trying_master_node)
                     continue
-                msg_id = conn.ask(Packets.AskPrimary(), queue=queue)
                 try:
+                    msg_id = conn.ask(Packets.AskPrimary(), queue=queue)
                     self._waitMessage(conn, msg_id,
                             handler=self.primary_bootstrap_handler)
                 except ConnectionClosed:
@@ -358,40 +361,44 @@ class Application(object):
                 # If we reached the primary master node, mark as connected
                 connected = self.primary_master_node is not None and \
                         self.primary_master_node is self.trying_master_node
-
-            logging.info('connected to a primary master node')
-            # Identify to primary master and request initial data
-            while conn.getUUID() is None:
-                if conn.getConnector() is None:
-                    logging.error('Connection to master node %s lost',
-                                  self.trying_master_node)
-                    self.primary_master_node = None
-                    break
-                p = Packets.RequestIdentification(NodeTypes.CLIENT,
-                        self.uuid, None, self.name)
-                msg_id = conn.ask(p, queue=queue)
-                try:
-                    self._waitMessage(conn, msg_id,
-                            handler=self.primary_bootstrap_handler)
-                except ConnectionClosed:
-                    self.primary_master_node = None
-                    break
-                if conn.getUUID() is None:
-                    # Node identification was refused by master.
-                    time.sleep(1)
-            if self.uuid is not None:
-                msg_id = conn.ask(Packets.AskNodeInformation(), queue=queue)
-                self._waitMessage(conn, msg_id,
-                        handler=self.primary_bootstrap_handler)
-                msg_id = conn.ask(Packets.AskPartitionTable(), queue=queue)
-                self._waitMessage(conn, msg_id,
-                        handler=self.primary_bootstrap_handler)
-            ready = self.uuid is not None and self.pt is not None \
-                                 and self.pt.operational()
-        logging.info("connected to primary master node %s" %
-                self.primary_master_node)
+            logging.info('Connected to %s' % (self.primary_master_node, ))
+            try:
+                ready = self.identifyToPrimaryNode(conn)
+            except ConnectionClosed:
+                logging.error('Connection to %s lost', self.trying_master_node)
+                self.primary_master_node = None
+                continue
+        logging.info("Connected and ready")
         return conn
 
+    def identifyToPrimaryNode(self, conn):
+        """
+            Request identification and required informations to be operational.
+            Might raise ConnectionClosed so that the new primary can be
+            looked-up again.
+        """
+        logging.info('Initializing from master')
+        queue = self.local_var.queue
+        # Identify to primary master and request initial data
+        while conn.getUUID() is None:
+            p = Packets.RequestIdentification(NodeTypes.CLIENT, self.uuid,
+                    None, self.name)
+            self._waitMessage(conn, conn.ask(p, queue=queue),
+                    handler=self.primary_bootstrap_handler)
+            if conn.getUUID() is None:
+                # Node identification was refused by master, it is considered
+                # as the primary as long as we are connected to it.
+                time.sleep(1)
+        if self.uuid is not None:
+            msg_id = conn.ask(Packets.AskNodeInformation(), queue=queue)
+            self._waitMessage(conn, msg_id,
+                    handler=self.primary_bootstrap_handler)
+            msg_id = conn.ask(Packets.AskPartitionTable(), queue=queue)
+            self._waitMessage(conn, msg_id,
+                    handler=self.primary_bootstrap_handler)
+        return self.uuid is not None and self.pt is not None \
+                             and self.pt.operational()
+
     def registerDB(self, db, limit):
         self._db = db
 

Modified: trunk/neo/client/exception.py
==============================================================================
--- trunk/neo/client/exception.py [iso-8859-1] (original)
+++ trunk/neo/client/exception.py [iso-8859-1] Wed Oct 27 18:24:40 2010
@@ -17,9 +17,6 @@
 
 from ZODB import POSException
 
-class ConnectionClosed(Exception):
-    pass
-
 class NEOStorageError(POSException.StorageError):
     pass
 

Modified: trunk/neo/client/pool.py
==============================================================================
--- trunk/neo/client/pool.py [iso-8859-1] (original)
+++ trunk/neo/client/pool.py [iso-8859-1] Wed Oct 27 18:24:40 2010
@@ -19,8 +19,7 @@
 from neo import logging
 from neo.locking import RLock
 from neo.protocol import NodeTypes, Packets
-from neo.connection import MTClientConnection
-from neo.client.exception import ConnectionClosed
+from neo.connection import MTClientConnection, ConnectionClosed
 from neo.profiling import profiler_decorator
 import time
 

Modified: trunk/neo/connection.py
==============================================================================
--- trunk/neo/connection.py [iso-8859-1] (original)
+++ trunk/neo/connection.py [iso-8859-1] Wed Oct 27 18:24:40 2010
@@ -36,6 +36,9 @@ PING_TIMEOUT = 5
 INCOMING_TIMEOUT = 10
 CRITICAL_TIMEOUT = 30
 
+class ConnectionClosed(Exception):
+    pass
+
 def not_closed(func):
     def decorator(self, *args, **kw):
         if self.connector is None:
@@ -734,6 +737,8 @@ class MTClientConnection(ClientConnectio
             queue=None):
         self.lock()
         try:
+            if self.isClosed():
+                raise ConnectionClosed
             # XXX: Here, we duplicate Connection.ask because we need to call
             # self.dispatcher.register after setId is called and before
             # _addPacket is called.





More information about the Neo-report mailing list