[Neo-report] r1926 gregory - in /trunk/neo: ./ client/ tests/client/
nobody at svn.erp5.org
nobody at svn.erp5.org
Mon Mar 8 21:44:16 CET 2010
Author: gregory
Date: Mon Mar 8 21:44:16 2010
New Revision: 1926
Log:
Take locks around connection's ask/notify instead of app.
Nothing is protected between lock/unlock, only writes to local_var are done and
and does not need exclusive access because local_var is thread-specific.
Modified:
trunk/neo/client/app.py
trunk/neo/client/pool.py
trunk/neo/connection.py
trunk/neo/tests/client/testClientApp.py
Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Mon Mar 8 21:44:16 2010
@@ -245,22 +245,14 @@
@profiler_decorator
def _askStorage(self, conn, packet):
""" Send a request to a storage node and process it's answer """
- try:
- msg_id = conn.ask(packet)
- finally:
- # assume that the connection was already locked
- conn.unlock()
+ msg_id = conn.ask(packet)
self._waitMessage(conn, msg_id, self.storage_handler)
@profiler_decorator
def _askPrimary(self, packet):
""" Send a request to the primary master and process it's answer """
conn = self._getMasterConnection()
- conn.lock()
- try:
- msg_id = conn.ask(packet)
- finally:
- conn.unlock()
+ msg_id = conn.ask(packet)
self._waitMessage(conn, msg_id, self.primary_handler)
@profiler_decorator
@@ -327,16 +319,12 @@
connector=self.connector_handler(),
dispatcher=self.dispatcher)
# Query for primary master node
- conn.lock()
- try:
- if conn.getConnector() is None:
- # This happens if a connection could not be established.
- logging.error('Connection to master node %s failed',
- self.trying_master_node)
- continue
- msg_id = conn.ask(Packets.AskPrimary())
- finally:
- conn.unlock()
+ if conn.getConnector() is None:
+ # This happens if a connection could not be established.
+ logging.error('Connection to master node %s failed',
+ self.trying_master_node)
+ continue
+ msg_id = conn.ask(Packets.AskPrimary())
try:
self._waitMessage(conn, msg_id,
handler=self.primary_bootstrap_handler)
@@ -349,18 +337,14 @@
logging.info('connected to a primary master node')
# Identify to primary master and request initial data
while conn.getUUID() is None:
- conn.lock()
- try:
- if conn.getConnector() is None:
- logging.error('Connection to master node %s lost',
- self.trying_master_node)
- self.primary_master_node = None
- break
- p = Packets.RequestIdentification(NodeTypes.CLIENT,
- self.uuid, None, self.name)
- msg_id = conn.ask(p)
- finally:
- conn.unlock()
+ if conn.getConnector() is None:
+ logging.error('Connection to master node %s lost',
+ self.trying_master_node)
+ self.primary_master_node = None
+ break
+ p = Packets.RequestIdentification(NodeTypes.CLIENT,
+ self.uuid, None, self.name)
+ msg_id = conn.ask(p)
try:
self._waitMessage(conn, msg_id,
handler=self.primary_bootstrap_handler)
@@ -371,18 +355,10 @@
# Node identification was refused by master.
sleep(1)
if self.uuid is not None:
- conn.lock()
- try:
- msg_id = conn.ask(Packets.AskNodeInformation())
- finally:
- conn.unlock()
+ msg_id = conn.ask(Packets.AskNodeInformation())
self._waitMessage(conn, msg_id,
handler=self.primary_bootstrap_handler)
- conn.lock()
- try:
- msg_id = conn.ask(Packets.AskPartitionTable([]))
- finally:
- conn.unlock()
+ msg_id = conn.ask(Packets.AskPartitionTable([]))
self._waitMessage(conn, msg_id,
handler=self.primary_bootstrap_handler)
ready = self.uuid is not None and self.pt is not None \
@@ -603,10 +579,7 @@
if conn is None:
continue
try:
- try:
- conn.ask(p)
- finally:
- conn.unlock()
+ conn.ask(p)
except ConnectionClosed:
continue
@@ -749,18 +722,11 @@
conn = self.cp.getConnForCell(cell)
if conn is None:
continue
- try:
- conn.notify(Packets.AbortTransaction(self.local_var.tid))
- finally:
- conn.unlock()
+ conn.notify(Packets.AbortTransaction(self.local_var.tid))
# Abort the transaction in the primary master node.
conn = self._getMasterConnection()
- conn.lock()
- try:
- conn.notify(Packets.AbortTransaction(self.local_var.tid))
- finally:
- conn.unlock()
+ conn.notify(Packets.AbortTransaction(self.local_var.tid))
self.local_var.clear()
@profiler_decorator
@@ -877,11 +843,7 @@
conn = self.cp.getConnForNode(storage_node)
if conn is None:
continue
-
- try:
- conn.ask(Packets.AskTIDs(first, last, INVALID_PARTITION))
- finally:
- conn.unlock()
+ conn.ask(Packets.AskTIDs(first, last, INVALID_PARTITION))
# Wait for answers from all storages.
while len(self.local_var.node_tids) != len(storage_node_list):
Modified: trunk/neo/client/pool.py
==============================================================================
--- trunk/neo/client/pool.py [iso-8859-1] (original)
+++ trunk/neo/client/pool.py [iso-8859-1] Mon Mar 8 21:44:16 2010
@@ -116,7 +116,6 @@
return None
self.connection_dict[node.getUUID()] = conn
- conn.lock()
return conn
@profiler_decorator
@@ -135,7 +134,6 @@
try:
conn = self.connection_dict[uuid]
# Already connected to node
- conn.lock()
return conn
except KeyError:
# Create new connection to node
Modified: trunk/neo/connection.py
==============================================================================
--- trunk/neo/connection.py [iso-8859-1] (original)
+++ trunk/neo/connection.py [iso-8859-1] Mon Mar 8 21:44:16 2010
@@ -596,20 +596,26 @@
def analyse(self, *args, **kw):
return super(MTClientConnection, self).analyse(*args, **kw)
- @lockCheckWrapper
def notify(self, *args, **kw):
- return super(MTClientConnection, self).notify(*args, **kw)
-
- @lockCheckWrapper
+ self.lock()
+ try:
+ return super(MTClientConnection, self).notify(*args, **kw)
+ finally:
+ self.unlock()
+
def ask(self, packet, timeout=CRITICAL_TIMEOUT):
- msg_id = self._getNextId()
- packet.setId(msg_id)
- self.dispatcher.register(self, msg_id, self._local_var.queue)
- self._addPacket(packet)
- if not self._handlers.isPending():
- self._timeout.update(time(), timeout=timeout)
- self._handlers.emit(packet)
- return msg_id
+ self.lock()
+ try:
+ msg_id = self._getNextId()
+ packet.setId(msg_id)
+ self.dispatcher.register(self, msg_id, self._local_var.queue)
+ self._addPacket(packet)
+ if not self._handlers.isPending():
+ self._timeout.update(time(), timeout=timeout)
+ self._handlers.emit(packet)
+ return msg_id
+ finally:
+ self.unlock()
@lockCheckWrapper
def answer(self, *args, **kw):
Modified: trunk/neo/tests/client/testClientApp.py
==============================================================================
--- trunk/neo/tests/client/testClientApp.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testClientApp.py [iso-8859-1] Mon Mar 8 21:44:16 2010
@@ -950,7 +950,6 @@
Application._waitMessage = _waitMessage_old
# check packet sent, connection unlocked and dispatcher updated
self.checkAskNewTid(conn)
- self.assertEquals(len(conn.mockGetNamedCalls('unlock')), 1)
self.checkDispatcherRegisterCalled(app, conn)
# and _waitMessage called
self.assertTrue(self.test_ok)
@@ -976,8 +975,6 @@
Application._waitMessage = _waitMessage_old
# check packet sent, connection locked during process and dispatcher updated
self.checkAskNewTid(conn)
- self.assertEquals(len(conn.mockGetNamedCalls('lock')), 1)
- self.assertEquals(len(conn.mockGetNamedCalls('unlock')), 1)
self.checkDispatcherRegisterCalled(app, conn)
# and _waitMessage called
self.assertTrue(self.test_ok)
More information about the Neo-report
mailing list