[Neo-report] r1773 vincent - in /trunk/neo: client/app.py tests/client/testClientApp.py
nobody at svn.erp5.org
nobody at svn.erp5.org
Tue Feb 16 14:58:24 CET 2010
Author: vincent
Date: Tue Feb 16 14:58:24 2010
New Revision: 1773
Log:
Split _waitMessage into simpler methods.
Modified:
trunk/neo/client/app.py
trunk/neo/tests/client/testClientApp.py
Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Tue Feb 16 14:58:24 2010
@@ -155,41 +155,65 @@
self._nm_acquire = lock.acquire
self._nm_release = lock.release
- def _waitMessage(self, target_conn = None, msg_id = None, handler=None):
- """Wait for a message returned by the dispatcher in queues."""
- local_queue = self.local_var.queue
- while True:
- if msg_id is None:
- try:
- conn, packet = local_queue.get_nowait()
- except Empty:
- break
- else:
- conn, packet = local_queue.get()
- # check fake packet
- if packet is None:
- if conn.getUUID() == target_conn.getUUID():
- raise ConnectionClosed
- else:
- continue
+ def _handlePacket(self, conn, packet, handler=None):
+ """
+ conn
+ The connection which received the packet (forwarded to handler).
+ packet
+ The packet to handle.
+ handler
+ The handler to use to handle packet.
+ If not given, it will be guessed from connection's not type.
+ """
+ if handler is None:
# Guess the handler to use based on the type of node on the
# connection
- if handler is None:
- node = self.nm.getByAddress(conn.getAddress())
- if node is None:
- raise ValueError, 'Expecting an answer from a node ' \
- 'which type is not known... Is this right ?'
- else:
- if node.isStorage():
- handler = self.storage_handler
- elif node.isMaster():
- handler = self.primary_handler
- else:
- raise ValueError, 'Unknown node type: %r' % (
- node.__class__, )
- handler.dispatch(conn, packet)
- if target_conn is conn and msg_id == packet.getId():
+ node = self.nm.getByAddress(conn.getAddress())
+ if node is None:
+ raise ValueError, 'Expecting an answer from a node ' \
+ 'which type is not known... Is this right ?'
+ if node.isStorage():
+ handler = self.storage_handler
+ elif node.isMaster():
+ handler = self.primary_handler
+ else:
+ raise ValueError, 'Unknown node type: %r' % (node.__class__, )
+ handler.dispatch(conn, packet)
+
+ def _waitAnyMessage(self, block=True):
+ """
+ Handle all pending packets.
+ block
+ If True (default), will block until at least one packet was
+ received.
+ """
+ get = self.local_var.queue.get
+ _handlePacket = self._handlePacket
+ while True:
+ try:
+ conn, packet = get(block)
+ except Empty:
break
+ block = False
+ try:
+ _handlePacket(conn, packet)
+ except ConnectionClosed:
+ pass
+
+ def _waitMessage(self, target_conn, msg_id, handler=None):
+ """Wait for a message returned by the dispatcher in queues."""
+ get = self.local_var.queue.get
+ _handlePacket = self._handlePacket
+ while True:
+ conn, packet = get(True)
+ if target_conn is conn:
+ # check fake packet
+ if packet is None:
+ raise ConnectionClosed
+ if msg_id == packet.getId():
+ self._handlePacket(conn, packet, handler=handler)
+ break
+ self._handlePacket(conn, packet)
def _askStorage(self, conn, packet, timeout=5, additional_timeout=30):
""" Send a request to a storage node and process it's answer """
@@ -756,10 +780,7 @@
# Wait for answers from all storages.
while len(self.local_var.node_tids) != len(storage_node_list):
- try:
- self._waitMessage(handler=self.storage_handler)
- except ConnectionClosed:
- continue
+ self._waitAnyMessage()
# Reorder tids
ordered_tids = set()
@@ -893,7 +914,7 @@
close = __del__
def sync(self):
- self._waitMessage()
+ self._waitAnyMessage(False)
def setNodeReady(self):
self.local_var.node_ready = True
Modified: trunk/neo/tests/client/testClientApp.py
==============================================================================
--- trunk/neo/tests/client/testClientApp.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testClientApp.py [iso-8859-1] Tue Feb 16 14:58:24 2010
@@ -43,12 +43,11 @@
self.master_conn = _getMasterConnection(self)
return self.pt
-def _waitMessage(self, conn=None, msg_id=None, handler=None):
- if conn is not None and handler is not None:
+def _waitMessage(self, conn, msg_id, handler=None):
+ if handler is None:
+ raise NotImplementedError
+ else:
handler.dispatch(conn, conn.fakeReceived())
- else:
- raise NotImplementedError
-
class ClientApplicationTests(NeoTestBase):
@@ -218,7 +217,7 @@
'getAddress': ('127.0.0.1', 0),
'fakeReceived': packet,
})
- app.local_var.queue = Mock({'get_nowait' : (conn, None)})
+ app.local_var.queue = Mock({'get' : (conn, None)})
app.pt = Mock({ 'getCellListForOID': (cell, ), })
app.cp = Mock({ 'getConnForCell' : conn})
app.local_var.asked_object = -1
@@ -776,11 +775,11 @@
'getCellListForTID': ReturnValues([cell1], [cell2]),
})
app.cp = Mock({ 'getConnForCell': conn})
- def _waitMessage(self, conn=None, msg_id=None, handler=None):
+ def _waitAnyMessage(self):
self.local_var.node_tids = {uuid1: (tid1, ), uuid2: (tid2, )}
- Application._waitMessage = _waitMessage_old
- _waitMessage_old = Application._waitMessage
- Application._waitMessage = _waitMessage
+ Application._waitAnyMessage = _waitAnyMessage_old
+ _waitAnyMessage_old = Application._waitAnyMessage
+ Application._waitAnyMessage = _waitAnyMessage
def txn_filter(info):
return info['id'] > '\x00' * 8
result = app.undoLog(0, 4, filter=txn_filter)
@@ -835,42 +834,42 @@
# TODO: test more connection failure cases
# Seventh packet : askNodeInformation succeeded
all_passed = []
- def _waitMessage8(self, conn=None, msg_id=None, handler=None):
+ def _waitMessage8(self, conn, msg_id, handler=None):
all_passed.append(1)
# Sixth packet : askPartitionTable succeeded
- def _waitMessage7(self, conn=None, msg_id=None, handler=None):
+ def _waitMessage7(self, conn, msg_id, handler=None):
app.pt = Mock({'operational': True})
Application._waitMessage = _waitMessage8
# fifth packet : request node identification succeeded
- def _waitMessage6(self, conn=None, msg_id=None, handler=None):
+ def _waitMessage6(self, conn, msg_id, handler=None):
conn.setUUID('D' * 16)
app.uuid = 'C' * 16
Application._waitMessage = _waitMessage7
# fourth iteration : connection to primary master succeeded
- def _waitMessage5(self, conn=None, msg_id=None, handler=None):
+ def _waitMessage5(self, conn, msg_id, handler=None):
app.trying_master_node = app.primary_master_node = Mock({
'getAddress': ('192.168.1.1', 10000),
'__str__': 'Fake master node',
})
Application._waitMessage = _waitMessage6
# third iteration : node not ready
- def _waitMessage4(app, conn=None, msg_id=None, handler=None):
+ def _waitMessage4(app, conn, msg_id, handler=None):
app.setNodeNotReady()
app.trying_master_node = None
Application._waitMessage = _waitMessage5
# second iteration : master node changed
- def _waitMessage3(app, conn=None, msg_id=None, handler=None):
+ def _waitMessage3(app, conn, msg_id, handler=None):
app.primary_master_node = Mock({
'getAddress': ('192.168.1.1', 10000),
'__str__': 'Fake master node',
})
Application._waitMessage = _waitMessage4
# first iteration : connection failed
- def _waitMessage2(app, conn=None, msg_id=None, handler=None):
+ def _waitMessage2(app, conn, msg_id, handler=None):
app.trying_master_node = None
Application._waitMessage = _waitMessage3
# do nothing for the first call
- def _waitMessage1(app, conn=None, msg_id=None, handler=None):
+ def _waitMessage1(app, conn, msg_id, handler=None):
Application._waitMessage = _waitMessage2
_waitMessage_old = Application._waitMessage
Application._waitMessage = _waitMessage1
@@ -892,7 +891,7 @@
app.dispatcher = Mock()
conn = Mock()
self.test_ok = False
- def _waitMessage_hook(app, conn=None, msg_id=None, handler=None):
+ def _waitMessage_hook(app, conn, msg_id, handler=None):
self.test_ok = True
_waitMessage_old = Application._waitMessage
packet = Packets.AskBeginTransaction(None)
@@ -917,7 +916,7 @@
app.master_conn = conn
app.primary_handler = Mock()
self.test_ok = False
- def _waitMessage_hook(app, conn=None, msg_id=None, handler=None):
+ def _waitMessage_hook(app, conn, msg_id, handler=None):
self.assertTrue(handler is app.primary_handler)
self.test_ok = True
_waitMessage_old = Application._waitMessage
More information about the Neo-report
mailing list