[Neo-report] r1925 gregory - in /trunk/neo: ./ client/ tests/client/
nobody at svn.erp5.org
nobody at svn.erp5.org
Mon Mar 8 21:44:04 CET 2010
Author: gregory
Date: Mon Mar 8 21:44:04 2010
New Revision: 1925
Log:
MTConnection handle local queue to unify ask() prototype.
Modified:
trunk/neo/client/app.py
trunk/neo/client/pool.py
trunk/neo/connection.py
trunk/neo/tests/client/testClientApp.py
Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Mon Mar 8 21:44:04 2010
@@ -246,7 +246,7 @@
def _askStorage(self, conn, packet):
""" Send a request to a storage node and process it's answer """
try:
- msg_id = conn.ask(self.local_var.queue, packet)
+ msg_id = conn.ask(packet)
finally:
# assume that the connection was already locked
conn.unlock()
@@ -258,7 +258,7 @@
conn = self._getMasterConnection()
conn.lock()
try:
- msg_id = conn.ask(self.local_var.queue, packet)
+ msg_id = conn.ask(packet)
finally:
conn.unlock()
self._waitMessage(conn, msg_id, self.primary_handler)
@@ -321,7 +321,8 @@
self.trying_master_node = master_list[0]
index += 1
# Connect to master
- conn = MTClientConnection(self.em, self.notifications_handler,
+ conn = MTClientConnection(self.local_var, self.em,
+ self.notifications_handler,
addr=self.trying_master_node.getAddress(),
connector=self.connector_handler(),
dispatcher=self.dispatcher)
@@ -333,8 +334,7 @@
logging.error('Connection to master node %s failed',
self.trying_master_node)
continue
- msg_id = conn.ask(self.local_var.queue,
- Packets.AskPrimary())
+ msg_id = conn.ask(Packets.AskPrimary())
finally:
conn.unlock()
try:
@@ -358,7 +358,7 @@
break
p = Packets.RequestIdentification(NodeTypes.CLIENT,
self.uuid, None, self.name)
- msg_id = conn.ask(self.local_var.queue, p)
+ msg_id = conn.ask(p)
finally:
conn.unlock()
try:
@@ -373,16 +373,14 @@
if self.uuid is not None:
conn.lock()
try:
- msg_id = conn.ask(self.local_var.queue,
- Packets.AskNodeInformation())
+ msg_id = conn.ask(Packets.AskNodeInformation())
finally:
conn.unlock()
self._waitMessage(conn, msg_id,
handler=self.primary_bootstrap_handler)
conn.lock()
try:
- msg_id = conn.ask(self.local_var.queue,
- Packets.AskPartitionTable([]))
+ msg_id = conn.ask(Packets.AskPartitionTable([]))
finally:
conn.unlock()
self._waitMessage(conn, msg_id,
@@ -600,14 +598,13 @@
# Store data on each node
self.local_var.object_stored_counter_dict[oid] = 0
self.local_var.object_serial_dict[oid] = (serial, version)
- local_queue = self.local_var.queue
for cell in cell_list:
conn = self.cp.getConnForCell(cell)
if conn is None:
continue
try:
try:
- conn.ask(local_queue, p)
+ conn.ask(p)
finally:
conn.unlock()
except ConnectionClosed:
@@ -882,8 +879,7 @@
continue
try:
- conn.ask(self.local_var.queue, Packets.AskTIDs(first, last,
- INVALID_PARTITION))
+ conn.ask(Packets.AskTIDs(first, last, INVALID_PARTITION))
finally:
conn.unlock()
Modified: trunk/neo/client/pool.py
==============================================================================
--- trunk/neo/client/pool.py [iso-8859-1] (original)
+++ trunk/neo/client/pool.py [iso-8859-1] Mon Mar 8 21:44:04 2010
@@ -50,7 +50,8 @@
while True:
logging.debug('trying to connect to %s - %s', node, node.getState())
app.setNodeReady()
- conn = MTClientConnection(app.em, app.storage_event_handler, addr,
+ conn = MTClientConnection(app.local_var, app.em,
+ app.storage_event_handler, addr,
connector=app.connector_handler(), dispatcher=app.dispatcher)
conn.lock()
@@ -62,7 +63,7 @@
p = Packets.RequestIdentification(NodeTypes.CLIENT,
app.uuid, None, app.name)
- msg_id = conn.ask(app.local_var.queue, p)
+ msg_id = conn.ask(p)
finally:
conn.unlock()
Modified: trunk/neo/connection.py
==============================================================================
--- trunk/neo/connection.py [iso-8859-1] (original)
+++ trunk/neo/connection.py [iso-8859-1] Mon Mar 8 21:44:04 2010
@@ -565,9 +565,10 @@
class MTClientConnection(ClientConnection):
"""A Multithread-safe version of ClientConnection."""
- def __init__(self, *args, **kwargs):
+ def __init__(self, local_var, *args, **kwargs):
# _lock is only here for lock debugging purposes. Do not use.
self._lock = lock = RLock()
+ self._local_var = local_var
self.acquire = lock.acquire
self.release = lock.release
self.dispatcher = kwargs.pop('dispatcher')
@@ -600,10 +601,10 @@
return super(MTClientConnection, self).notify(*args, **kw)
@lockCheckWrapper
- def ask(self, queue, packet, timeout=CRITICAL_TIMEOUT):
+ def ask(self, packet, timeout=CRITICAL_TIMEOUT):
msg_id = self._getNextId()
packet.setId(msg_id)
- self.dispatcher.register(self, msg_id, queue)
+ self.dispatcher.register(self, msg_id, self._local_var.queue)
self._addPacket(packet)
if not self._handlers.isPending():
self._timeout.update(time(), timeout=timeout)
Modified: trunk/neo/tests/client/testClientApp.py
==============================================================================
--- trunk/neo/tests/client/testClientApp.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testClientApp.py [iso-8859-1] Mon Mar 8 21:44:04 2010
@@ -76,7 +76,7 @@
calls = conn.mockGetNamedCalls('ask')
self.assertEquals(len(calls), 1)
# client connection got queue as first parameter
- packet = calls[0].getParam(1)
+ packet = calls[0].getParam(0)
self.assertTrue(isinstance(packet, Packet))
self.assertEquals(packet.getType(), packet_type)
if decode:
More information about the Neo-report
mailing list