[Neo-report] r2591 gregory - in /trunk/neo: client/ tests/client/

nobody at svn.erp5.org nobody at svn.erp5.org
Wed Jan 5 18:52:25 CET 2011


Author: gregory
Date: Wed Jan  5 18:52:25 2011
New Revision: 2591

Log:
Make iterateForObject always yield a connection for each node.

Don't block on a non-ready node but do not skip it
Fix test method name and update it (it will never raise StopIteration)

Modified:
    trunk/neo/client/app.py
    trunk/neo/client/pool.py
    trunk/neo/tests/client/testConnectionPool.py

Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Wed Jan  5 18:52:25 2011
@@ -622,31 +622,28 @@ class Application(object):
     def _loadFromStorage(self, oid, at_tid, before_tid):
         self.local_var.asked_object = 0
         packet = Packets.AskObject(oid, at_tid, before_tid)
-        while self.local_var.asked_object == 0:
-            # try without waiting for a node to be ready
-            for node, conn in self.cp.iterateForObject(oid, readable=True,
-                    wait_ready=False):
-                try:
-                    self._askStorage(conn, packet)
-                except ConnectionClosed:
-                    continue
-
-                # Check data
-                noid, tid, next_tid, compression, checksum, data \
-                    = self.local_var.asked_object
-                if noid != oid:
-                    # Oops, try with next node
-                    neo.logging.error('got wrong oid %s instead of %s from %s',
-                        noid, dump(oid), conn)
-                    self.local_var.asked_object = -1
-                    continue
-                elif checksum != makeChecksum(data):
-                    # Check checksum.
-                    neo.logging.error('wrong checksum from %s for oid %s',
-                                  conn, dump(oid))
-                    self.local_var.asked_object = -1
-                    continue
-                break
+        for node, conn in self.cp.iterateForObject(oid, readable=True):
+            try:
+                self._askStorage(conn, packet)
+            except ConnectionClosed:
+                continue
+
+            # Check data
+            noid, tid, next_tid, compression, checksum, data \
+                = self.local_var.asked_object
+            if noid != oid:
+                # Oops, try with next node
+                neo.logging.error('got wrong oid %s instead of %s from %s',
+                    noid, dump(oid), conn)
+                self.local_var.asked_object = -1
+                continue
+            elif checksum != makeChecksum(data):
+                # Check checksum.
+                neo.logging.error('wrong checksum from %s for oid %s',
+                              conn, dump(oid))
+                self.local_var.asked_object = -1
+                continue
+            break
         if self.local_var.asked_object == -1:
             raise NEOStorageError('inconsistent data')
 
@@ -735,8 +732,7 @@ class Application(object):
         add_involved_nodes = self.local_var.involved_nodes.add
         packet = Packets.AskStoreObject(oid, serial, compression,
                  checksum, compressed_data, data_serial, self.local_var.tid)
-        for node, conn in self.cp.iterateForObject(oid, writable=True,
-                wait_ready=True):
+        for node, conn in self.cp.iterateForObject(oid, writable=True):
             try:
                 conn.ask(packet, on_timeout=on_timeout, queue=queue)
                 add_involved_nodes(node)
@@ -865,8 +861,7 @@ class Application(object):
             str(transaction.description), dumps(transaction._extension),
             local_var.data_list)
         add_involved_nodes = self.local_var.involved_nodes.add
-        for node, conn in self.cp.iterateForObject(tid, writable=True,
-                wait_ready=False):
+        for node, conn in self.cp.iterateForObject(tid, writable=True):
             neo.logging.debug("voting object %s on %s", dump(tid),
                 dump(conn.getUUID()))
             try:
@@ -1011,7 +1006,7 @@ class Application(object):
             cell_list = getCellList(partition, readable=True)
             shuffle(cell_list)
             cell_list.sort(key=getCellSortKey)
-            storage_conn = getConnForCell(cell_list[0], wait_ready=False)
+            storage_conn = getConnForCell(cell_list[0])
             storage_conn.ask(Packets.AskObjectUndoSerial(self.local_var.tid,
                 snapshot_tid, undone_tid, oid_list), queue=queue)
 
@@ -1064,8 +1059,7 @@ class Application(object):
 
     def _getTransactionInformation(self, tid):
         packet = Packets.AskTransactionInformation(tid)
-        for node, conn in self.cp.iterateForObject(tid, readable=True,
-                wait_ready=False):
+        for node, conn in self.cp.iterateForObject(tid, readable=True):
             try:
                 self._askStorage(conn, packet)
             except ConnectionClosed:
@@ -1162,8 +1156,7 @@ class Application(object):
     def history(self, oid, version=None, size=1, filter=None):
         # Get history informations for object first
         packet = Packets.AskObjectHistory(oid, 0, size)
-        for node, conn in self.cp.iterateForObject(oid, readable=True,
-                wait_ready=False):
+        for node, conn in self.cp.iterateForObject(oid, readable=True):
             # FIXME: we keep overwriting self.local_var.history here, we
             # should aggregate it instead.
             self.local_var.history = None
@@ -1299,8 +1292,7 @@ class Application(object):
             data_dict[oid] = None
             local_var.data_list.append(oid)
         packet = Packets.AskCheckCurrentSerial(local_var.tid, serial, oid)
-        for node, conn in self.cp.iterateForObject(oid, writable=True,
-                wait_ready=False):
+        for node, conn in self.cp.iterateForObject(oid, writable=True):
             try:
                 conn.ask(packet, queue=queue)
             except ConnectionClosed:

Modified: trunk/neo/client/pool.py
==============================================================================
--- trunk/neo/client/pool.py [iso-8859-1] (original)
+++ trunk/neo/client/pool.py [iso-8859-1] Wed Jan  5 18:52:25 2011
@@ -139,30 +139,34 @@ class ConnectionPool(object):
         return result
 
     @profiler_decorator
-    def getConnForCell(self, cell, wait_ready=False):
-        return self.getConnForNode(cell.getNode(), wait_ready=wait_ready)
+    def getConnForCell(self, cell):
+        return self.getConnForNode(cell.getNode())
 
-    def iterateForObject(self, object_id, readable=False, writable=False,
-            wait_ready=False):
-        """ Iterate over nodes responsible of a object by it's ID """
+    def iterateForObject(self, object_id, readable=False, writable=False):
+        """ Iterate over nodes managing an object """
         pt = self.app.getPartitionTable()
         cell_list = pt.getCellListForOID(object_id, readable, writable)
-        yielded = 0
-        if cell_list:
+        if not cell_list:
+            raise NEOStorageError('no storage available')
+        getConnForNode = self.getConnForNode
+        while cell_list:
+            new_cell_list = []
             shuffle(cell_list)
             cell_list.sort(key=self.getCellSortKey)
-            getConnForNode = self.getConnForNode
             for cell in cell_list:
                 node = cell.getNode()
-                conn = getConnForNode(node, wait_ready=wait_ready)
+                conn = getConnForNode(node)
                 if conn is not None:
-                    yielded += 1
                     yield (node, conn)
-        if not yielded:
-            raise NEOStorageError('no storage available')
+                else:
+                    new_cell_list.append(cell)
+            cell_list = new_cell_list
+            if new_cell_list:
+                # wait a bit to avoid a busy loop
+                time.sleep(1)
 
     @profiler_decorator
-    def getConnForNode(self, node, wait_ready=True):
+    def getConnForNode(self, node):
         """Return a locked connection object to a given node
         If no connection exists, create a new one"""
         if not node.isRunning():
@@ -180,9 +184,6 @@ class ConnectionPool(object):
                 # Create new connection to node
                 while True:
                     conn = self._initNodeConnection(node)
-                    if conn is NOT_READY and wait_ready:
-                        time.sleep(1)
-                        continue
                     if conn not in (None, NOT_READY):
                         self.connection_dict[uuid] = conn
                         return conn

Modified: trunk/neo/tests/client/testConnectionPool.py
==============================================================================
--- trunk/neo/tests/client/testConnectionPool.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testConnectionPool.py [iso-8859-1] Wed Jan  5 18:52:25 2011
@@ -16,7 +16,7 @@
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 
 import unittest
-from mock import Mock
+from mock import Mock, ReturnValues
 
 from neo.tests import NeoUnitTestBase
 from neo.client.app import ConnectionPool
@@ -78,7 +78,7 @@ class ConnectionPoolTests(NeoUnitTestBas
         self.assertRaises(NEOStorageError, pool.iterateForObject(oid).next)
 
     def test_iterateForObject_connectionRefused(self):
-        # connection refused
+        # connection refused at the first try
         oid = self.getOID(1)
         node = Mock({'__repr__': 'node'})
         cell = Mock({'__repr__': 'cell', 'getNode': node})
@@ -86,11 +86,11 @@ class ConnectionPoolTests(NeoUnitTestBas
         pt = Mock({'getCellListForOID': [cell]})
         app = Mock({'getPartitionTable': pt})
         pool = ConnectionPool(app)
-        pool.getConnForNode = Mock({'__call__': None})
-        self.assertRaises(StopIteration, pool.iterateForObject(oid).next)
+        pool.getConnForNode = Mock({'__call__': ReturnValues(None, conn)})
+        self.assertEqual(list(pool.iterateForObject(oid)), [(node, conn)])
 
-    def test_iterateForObject_connectionRefused(self):
-        # connection refused
+    def test_iterateForObject_connectionAccepted(self):
+        # connection accepted
         oid = self.getOID(1)
         node = Mock({'__repr__': 'node'})
         cell = Mock({'__repr__': 'cell', 'getNode': node})




More information about the Neo-report mailing list