[Neo-report] r1925 gregory - in /trunk/neo: ./ client/ tests/client/

nobody at svn.erp5.org nobody at svn.erp5.org
Mon Mar 8 21:44:04 CET 2010


Author: gregory
Date: Mon Mar  8 21:44:04 2010
New Revision: 1925

Log:
MTConnection handle local queue to unify ask() prototype.

Modified:
    trunk/neo/client/app.py
    trunk/neo/client/pool.py
    trunk/neo/connection.py
    trunk/neo/tests/client/testClientApp.py

Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Mon Mar  8 21:44:04 2010
@@ -246,7 +246,7 @@
     def _askStorage(self, conn, packet):
         """ Send a request to a storage node and process it's answer """
         try:
-            msg_id = conn.ask(self.local_var.queue, packet)
+            msg_id = conn.ask(packet)
         finally:
             # assume that the connection was already locked
             conn.unlock()
@@ -258,7 +258,7 @@
         conn = self._getMasterConnection()
         conn.lock()
         try:
-            msg_id = conn.ask(self.local_var.queue, packet)
+            msg_id = conn.ask(packet)
         finally:
             conn.unlock()
         self._waitMessage(conn, msg_id, self.primary_handler)
@@ -321,7 +321,8 @@
                         self.trying_master_node = master_list[0]
                     index += 1
                 # Connect to master
-                conn = MTClientConnection(self.em, self.notifications_handler,
+                conn = MTClientConnection(self.local_var, self.em,
+                        self.notifications_handler,
                         addr=self.trying_master_node.getAddress(),
                         connector=self.connector_handler(),
                         dispatcher=self.dispatcher)
@@ -333,8 +334,7 @@
                         logging.error('Connection to master node %s failed',
                                       self.trying_master_node)
                         continue
-                    msg_id = conn.ask(self.local_var.queue,
-                            Packets.AskPrimary())
+                    msg_id = conn.ask(Packets.AskPrimary())
                 finally:
                     conn.unlock()
                 try:
@@ -358,7 +358,7 @@
                         break
                     p = Packets.RequestIdentification(NodeTypes.CLIENT,
                             self.uuid, None, self.name)
-                    msg_id = conn.ask(self.local_var.queue, p)
+                    msg_id = conn.ask(p)
                 finally:
                     conn.unlock()
                 try:
@@ -373,16 +373,14 @@
             if self.uuid is not None:
                 conn.lock()
                 try:
-                    msg_id = conn.ask(self.local_var.queue,
-                                      Packets.AskNodeInformation())
+                    msg_id = conn.ask(Packets.AskNodeInformation())
                 finally:
                     conn.unlock()
                 self._waitMessage(conn, msg_id,
                         handler=self.primary_bootstrap_handler)
                 conn.lock()
                 try:
-                    msg_id = conn.ask(self.local_var.queue,
-                                      Packets.AskPartitionTable([]))
+                    msg_id = conn.ask(Packets.AskPartitionTable([]))
                 finally:
                     conn.unlock()
                 self._waitMessage(conn, msg_id,
@@ -600,14 +598,13 @@
         # Store data on each node
         self.local_var.object_stored_counter_dict[oid] = 0
         self.local_var.object_serial_dict[oid] = (serial, version)
-        local_queue = self.local_var.queue
         for cell in cell_list:
             conn = self.cp.getConnForCell(cell)
             if conn is None:
                 continue
             try:
                 try:
-                    conn.ask(local_queue, p)
+                    conn.ask(p)
                 finally:
                     conn.unlock()
             except ConnectionClosed:
@@ -882,8 +879,7 @@
                 continue
 
             try:
-                conn.ask(self.local_var.queue, Packets.AskTIDs(first, last,
-                    INVALID_PARTITION))
+                conn.ask(Packets.AskTIDs(first, last, INVALID_PARTITION))
             finally:
                 conn.unlock()
 

Modified: trunk/neo/client/pool.py
==============================================================================
--- trunk/neo/client/pool.py [iso-8859-1] (original)
+++ trunk/neo/client/pool.py [iso-8859-1] Mon Mar  8 21:44:04 2010
@@ -50,7 +50,8 @@
         while True:
             logging.debug('trying to connect to %s - %s', node, node.getState())
             app.setNodeReady()
-            conn = MTClientConnection(app.em, app.storage_event_handler, addr,
+            conn = MTClientConnection(app.local_var, app.em,
+                app.storage_event_handler, addr,
                 connector=app.connector_handler(), dispatcher=app.dispatcher)
             conn.lock()
 
@@ -62,7 +63,7 @@
 
                 p = Packets.RequestIdentification(NodeTypes.CLIENT,
                             app.uuid, None, app.name)
-                msg_id = conn.ask(app.local_var.queue, p)
+                msg_id = conn.ask(p)
             finally:
                 conn.unlock()
 

Modified: trunk/neo/connection.py
==============================================================================
--- trunk/neo/connection.py [iso-8859-1] (original)
+++ trunk/neo/connection.py [iso-8859-1] Mon Mar  8 21:44:04 2010
@@ -565,9 +565,10 @@
 class MTClientConnection(ClientConnection):
     """A Multithread-safe version of ClientConnection."""
 
-    def __init__(self, *args, **kwargs):
+    def __init__(self, local_var, *args, **kwargs):
         # _lock is only here for lock debugging purposes. Do not use.
         self._lock = lock = RLock()
+        self._local_var = local_var
         self.acquire = lock.acquire
         self.release = lock.release
         self.dispatcher = kwargs.pop('dispatcher')
@@ -600,10 +601,10 @@
         return super(MTClientConnection, self).notify(*args, **kw)
 
     @lockCheckWrapper
-    def ask(self, queue, packet, timeout=CRITICAL_TIMEOUT):
+    def ask(self, packet, timeout=CRITICAL_TIMEOUT):
         msg_id = self._getNextId()
         packet.setId(msg_id)
-        self.dispatcher.register(self, msg_id, queue)
+        self.dispatcher.register(self, msg_id, self._local_var.queue)
         self._addPacket(packet)
         if not self._handlers.isPending():
             self._timeout.update(time(), timeout=timeout)

Modified: trunk/neo/tests/client/testClientApp.py
==============================================================================
--- trunk/neo/tests/client/testClientApp.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testClientApp.py [iso-8859-1] Mon Mar  8 21:44:04 2010
@@ -76,7 +76,7 @@
         calls = conn.mockGetNamedCalls('ask')
         self.assertEquals(len(calls), 1)
         # client connection got queue as first parameter
-        packet = calls[0].getParam(1)
+        packet = calls[0].getParam(0)
         self.assertTrue(isinstance(packet, Packet))
         self.assertEquals(packet.getType(), packet_type)
         if decode:





More information about the Neo-report mailing list