[Neo-report] r1926 gregory - in /trunk/neo: ./ client/ tests/client/

nobody at svn.erp5.org nobody at svn.erp5.org
Mon Mar 8 21:44:16 CET 2010


Author: gregory
Date: Mon Mar  8 21:44:16 2010
New Revision: 1926

Log:
Take locks around connection's ask/notify instead of app.

Nothing is protected between lock/unlock, only writes to local_var are done and
and does not need exclusive access because local_var is thread-specific.

Modified:
    trunk/neo/client/app.py
    trunk/neo/client/pool.py
    trunk/neo/connection.py
    trunk/neo/tests/client/testClientApp.py

Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Mon Mar  8 21:44:16 2010
@@ -245,22 +245,14 @@
     @profiler_decorator
     def _askStorage(self, conn, packet):
         """ Send a request to a storage node and process it's answer """
-        try:
-            msg_id = conn.ask(packet)
-        finally:
-            # assume that the connection was already locked
-            conn.unlock()
+        msg_id = conn.ask(packet)
         self._waitMessage(conn, msg_id, self.storage_handler)
 
     @profiler_decorator
     def _askPrimary(self, packet):
         """ Send a request to the primary master and process it's answer """
         conn = self._getMasterConnection()
-        conn.lock()
-        try:
-            msg_id = conn.ask(packet)
-        finally:
-            conn.unlock()
+        msg_id = conn.ask(packet)
         self._waitMessage(conn, msg_id, self.primary_handler)
 
     @profiler_decorator
@@ -327,16 +319,12 @@
                         connector=self.connector_handler(),
                         dispatcher=self.dispatcher)
                 # Query for primary master node
-                conn.lock()
-                try:
-                    if conn.getConnector() is None:
-                        # This happens if a connection could not be established.
-                        logging.error('Connection to master node %s failed',
-                                      self.trying_master_node)
-                        continue
-                    msg_id = conn.ask(Packets.AskPrimary())
-                finally:
-                    conn.unlock()
+                if conn.getConnector() is None:
+                    # This happens if a connection could not be established.
+                    logging.error('Connection to master node %s failed',
+                                  self.trying_master_node)
+                    continue
+                msg_id = conn.ask(Packets.AskPrimary())
                 try:
                     self._waitMessage(conn, msg_id,
                             handler=self.primary_bootstrap_handler)
@@ -349,18 +337,14 @@
             logging.info('connected to a primary master node')
             # Identify to primary master and request initial data
             while conn.getUUID() is None:
-                conn.lock()
-                try:
-                    if conn.getConnector() is None:
-                        logging.error('Connection to master node %s lost',
-                                      self.trying_master_node)
-                        self.primary_master_node = None
-                        break
-                    p = Packets.RequestIdentification(NodeTypes.CLIENT,
-                            self.uuid, None, self.name)
-                    msg_id = conn.ask(p)
-                finally:
-                    conn.unlock()
+                if conn.getConnector() is None:
+                    logging.error('Connection to master node %s lost',
+                                  self.trying_master_node)
+                    self.primary_master_node = None
+                    break
+                p = Packets.RequestIdentification(NodeTypes.CLIENT,
+                        self.uuid, None, self.name)
+                msg_id = conn.ask(p)
                 try:
                     self._waitMessage(conn, msg_id,
                             handler=self.primary_bootstrap_handler)
@@ -371,18 +355,10 @@
                     # Node identification was refused by master.
                     sleep(1)
             if self.uuid is not None:
-                conn.lock()
-                try:
-                    msg_id = conn.ask(Packets.AskNodeInformation())
-                finally:
-                    conn.unlock()
+                msg_id = conn.ask(Packets.AskNodeInformation())
                 self._waitMessage(conn, msg_id,
                         handler=self.primary_bootstrap_handler)
-                conn.lock()
-                try:
-                    msg_id = conn.ask(Packets.AskPartitionTable([]))
-                finally:
-                    conn.unlock()
+                msg_id = conn.ask(Packets.AskPartitionTable([]))
                 self._waitMessage(conn, msg_id,
                         handler=self.primary_bootstrap_handler)
             ready = self.uuid is not None and self.pt is not None \
@@ -603,10 +579,7 @@
             if conn is None:
                 continue
             try:
-                try:
-                    conn.ask(p)
-                finally:
-                    conn.unlock()
+                conn.ask(p)
             except ConnectionClosed:
                 continue
 
@@ -749,18 +722,11 @@
             conn = self.cp.getConnForCell(cell)
             if conn is None:
                 continue
-            try:
-                conn.notify(Packets.AbortTransaction(self.local_var.tid))
-            finally:
-                conn.unlock()
+            conn.notify(Packets.AbortTransaction(self.local_var.tid))
 
         # Abort the transaction in the primary master node.
         conn = self._getMasterConnection()
-        conn.lock()
-        try:
-            conn.notify(Packets.AbortTransaction(self.local_var.tid))
-        finally:
-            conn.unlock()
+        conn.notify(Packets.AbortTransaction(self.local_var.tid))
         self.local_var.clear()
 
     @profiler_decorator
@@ -877,11 +843,7 @@
             conn = self.cp.getConnForNode(storage_node)
             if conn is None:
                 continue
-
-            try:
-                conn.ask(Packets.AskTIDs(first, last, INVALID_PARTITION))
-            finally:
-                conn.unlock()
+            conn.ask(Packets.AskTIDs(first, last, INVALID_PARTITION))
 
         # Wait for answers from all storages.
         while len(self.local_var.node_tids) != len(storage_node_list):

Modified: trunk/neo/client/pool.py
==============================================================================
--- trunk/neo/client/pool.py [iso-8859-1] (original)
+++ trunk/neo/client/pool.py [iso-8859-1] Mon Mar  8 21:44:16 2010
@@ -116,7 +116,6 @@
             return None
 
         self.connection_dict[node.getUUID()] = conn
-        conn.lock()
         return conn
 
     @profiler_decorator
@@ -135,7 +134,6 @@
             try:
                 conn = self.connection_dict[uuid]
                 # Already connected to node
-                conn.lock()
                 return conn
             except KeyError:
                 # Create new connection to node

Modified: trunk/neo/connection.py
==============================================================================
--- trunk/neo/connection.py [iso-8859-1] (original)
+++ trunk/neo/connection.py [iso-8859-1] Mon Mar  8 21:44:16 2010
@@ -596,20 +596,26 @@
     def analyse(self, *args, **kw):
         return super(MTClientConnection, self).analyse(*args, **kw)
 
-    @lockCheckWrapper
     def notify(self, *args, **kw):
-        return super(MTClientConnection, self).notify(*args, **kw)
-
-    @lockCheckWrapper
+        self.lock()
+        try:
+            return super(MTClientConnection, self).notify(*args, **kw)
+        finally:
+            self.unlock()
+
     def ask(self, packet, timeout=CRITICAL_TIMEOUT):
-        msg_id = self._getNextId()
-        packet.setId(msg_id)
-        self.dispatcher.register(self, msg_id, self._local_var.queue)
-        self._addPacket(packet)
-        if not self._handlers.isPending():
-            self._timeout.update(time(), timeout=timeout)
-        self._handlers.emit(packet)
-        return msg_id
+        self.lock()
+        try:
+            msg_id = self._getNextId()
+            packet.setId(msg_id)
+            self.dispatcher.register(self, msg_id, self._local_var.queue)
+            self._addPacket(packet)
+            if not self._handlers.isPending():
+                self._timeout.update(time(), timeout=timeout)
+            self._handlers.emit(packet)
+            return msg_id
+        finally:
+            self.unlock()
 
     @lockCheckWrapper
     def answer(self, *args, **kw):

Modified: trunk/neo/tests/client/testClientApp.py
==============================================================================
--- trunk/neo/tests/client/testClientApp.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testClientApp.py [iso-8859-1] Mon Mar  8 21:44:16 2010
@@ -950,7 +950,6 @@
             Application._waitMessage = _waitMessage_old
         # check packet sent, connection unlocked and dispatcher updated
         self.checkAskNewTid(conn)
-        self.assertEquals(len(conn.mockGetNamedCalls('unlock')), 1)
         self.checkDispatcherRegisterCalled(app, conn)
         # and _waitMessage called
         self.assertTrue(self.test_ok)
@@ -976,8 +975,6 @@
             Application._waitMessage = _waitMessage_old
         # check packet sent, connection locked during process and dispatcher updated
         self.checkAskNewTid(conn)
-        self.assertEquals(len(conn.mockGetNamedCalls('lock')), 1)
-        self.assertEquals(len(conn.mockGetNamedCalls('unlock')), 1)
         self.checkDispatcherRegisterCalled(app, conn)
         # and _waitMessage called
         self.assertTrue(self.test_ok)





More information about the Neo-report mailing list