[Neo-report] r2591 gregory - in /trunk/neo: client/ tests/client/
nobody at svn.erp5.org
nobody at svn.erp5.org
Wed Jan 5 18:52:25 CET 2011
Author: gregory
Date: Wed Jan 5 18:52:25 2011
New Revision: 2591
Log:
Make iterateForObject always yield a connection for each node.
Don't block on a non-ready node but do not skip it
Fix test method name and update it (it will never raise StopIteration)
Modified:
trunk/neo/client/app.py
trunk/neo/client/pool.py
trunk/neo/tests/client/testConnectionPool.py
Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Wed Jan 5 18:52:25 2011
@@ -622,31 +622,28 @@ class Application(object):
def _loadFromStorage(self, oid, at_tid, before_tid):
self.local_var.asked_object = 0
packet = Packets.AskObject(oid, at_tid, before_tid)
- while self.local_var.asked_object == 0:
- # try without waiting for a node to be ready
- for node, conn in self.cp.iterateForObject(oid, readable=True,
- wait_ready=False):
- try:
- self._askStorage(conn, packet)
- except ConnectionClosed:
- continue
-
- # Check data
- noid, tid, next_tid, compression, checksum, data \
- = self.local_var.asked_object
- if noid != oid:
- # Oops, try with next node
- neo.logging.error('got wrong oid %s instead of %s from %s',
- noid, dump(oid), conn)
- self.local_var.asked_object = -1
- continue
- elif checksum != makeChecksum(data):
- # Check checksum.
- neo.logging.error('wrong checksum from %s for oid %s',
- conn, dump(oid))
- self.local_var.asked_object = -1
- continue
- break
+ for node, conn in self.cp.iterateForObject(oid, readable=True):
+ try:
+ self._askStorage(conn, packet)
+ except ConnectionClosed:
+ continue
+
+ # Check data
+ noid, tid, next_tid, compression, checksum, data \
+ = self.local_var.asked_object
+ if noid != oid:
+ # Oops, try with next node
+ neo.logging.error('got wrong oid %s instead of %s from %s',
+ noid, dump(oid), conn)
+ self.local_var.asked_object = -1
+ continue
+ elif checksum != makeChecksum(data):
+ # Check checksum.
+ neo.logging.error('wrong checksum from %s for oid %s',
+ conn, dump(oid))
+ self.local_var.asked_object = -1
+ continue
+ break
if self.local_var.asked_object == -1:
raise NEOStorageError('inconsistent data')
@@ -735,8 +732,7 @@ class Application(object):
add_involved_nodes = self.local_var.involved_nodes.add
packet = Packets.AskStoreObject(oid, serial, compression,
checksum, compressed_data, data_serial, self.local_var.tid)
- for node, conn in self.cp.iterateForObject(oid, writable=True,
- wait_ready=True):
+ for node, conn in self.cp.iterateForObject(oid, writable=True):
try:
conn.ask(packet, on_timeout=on_timeout, queue=queue)
add_involved_nodes(node)
@@ -865,8 +861,7 @@ class Application(object):
str(transaction.description), dumps(transaction._extension),
local_var.data_list)
add_involved_nodes = self.local_var.involved_nodes.add
- for node, conn in self.cp.iterateForObject(tid, writable=True,
- wait_ready=False):
+ for node, conn in self.cp.iterateForObject(tid, writable=True):
neo.logging.debug("voting object %s on %s", dump(tid),
dump(conn.getUUID()))
try:
@@ -1011,7 +1006,7 @@ class Application(object):
cell_list = getCellList(partition, readable=True)
shuffle(cell_list)
cell_list.sort(key=getCellSortKey)
- storage_conn = getConnForCell(cell_list[0], wait_ready=False)
+ storage_conn = getConnForCell(cell_list[0])
storage_conn.ask(Packets.AskObjectUndoSerial(self.local_var.tid,
snapshot_tid, undone_tid, oid_list), queue=queue)
@@ -1064,8 +1059,7 @@ class Application(object):
def _getTransactionInformation(self, tid):
packet = Packets.AskTransactionInformation(tid)
- for node, conn in self.cp.iterateForObject(tid, readable=True,
- wait_ready=False):
+ for node, conn in self.cp.iterateForObject(tid, readable=True):
try:
self._askStorage(conn, packet)
except ConnectionClosed:
@@ -1162,8 +1156,7 @@ class Application(object):
def history(self, oid, version=None, size=1, filter=None):
# Get history informations for object first
packet = Packets.AskObjectHistory(oid, 0, size)
- for node, conn in self.cp.iterateForObject(oid, readable=True,
- wait_ready=False):
+ for node, conn in self.cp.iterateForObject(oid, readable=True):
# FIXME: we keep overwriting self.local_var.history here, we
# should aggregate it instead.
self.local_var.history = None
@@ -1299,8 +1292,7 @@ class Application(object):
data_dict[oid] = None
local_var.data_list.append(oid)
packet = Packets.AskCheckCurrentSerial(local_var.tid, serial, oid)
- for node, conn in self.cp.iterateForObject(oid, writable=True,
- wait_ready=False):
+ for node, conn in self.cp.iterateForObject(oid, writable=True):
try:
conn.ask(packet, queue=queue)
except ConnectionClosed:
Modified: trunk/neo/client/pool.py
==============================================================================
--- trunk/neo/client/pool.py [iso-8859-1] (original)
+++ trunk/neo/client/pool.py [iso-8859-1] Wed Jan 5 18:52:25 2011
@@ -139,30 +139,34 @@ class ConnectionPool(object):
return result
@profiler_decorator
- def getConnForCell(self, cell, wait_ready=False):
- return self.getConnForNode(cell.getNode(), wait_ready=wait_ready)
+ def getConnForCell(self, cell):
+ return self.getConnForNode(cell.getNode())
- def iterateForObject(self, object_id, readable=False, writable=False,
- wait_ready=False):
- """ Iterate over nodes responsible of a object by it's ID """
+ def iterateForObject(self, object_id, readable=False, writable=False):
+ """ Iterate over nodes managing an object """
pt = self.app.getPartitionTable()
cell_list = pt.getCellListForOID(object_id, readable, writable)
- yielded = 0
- if cell_list:
+ if not cell_list:
+ raise NEOStorageError('no storage available')
+ getConnForNode = self.getConnForNode
+ while cell_list:
+ new_cell_list = []
shuffle(cell_list)
cell_list.sort(key=self.getCellSortKey)
- getConnForNode = self.getConnForNode
for cell in cell_list:
node = cell.getNode()
- conn = getConnForNode(node, wait_ready=wait_ready)
+ conn = getConnForNode(node)
if conn is not None:
- yielded += 1
yield (node, conn)
- if not yielded:
- raise NEOStorageError('no storage available')
+ else:
+ new_cell_list.append(cell)
+ cell_list = new_cell_list
+ if new_cell_list:
+ # wait a bit to avoid a busy loop
+ time.sleep(1)
@profiler_decorator
- def getConnForNode(self, node, wait_ready=True):
+ def getConnForNode(self, node):
"""Return a locked connection object to a given node
If no connection exists, create a new one"""
if not node.isRunning():
@@ -180,9 +184,6 @@ class ConnectionPool(object):
# Create new connection to node
while True:
conn = self._initNodeConnection(node)
- if conn is NOT_READY and wait_ready:
- time.sleep(1)
- continue
if conn not in (None, NOT_READY):
self.connection_dict[uuid] = conn
return conn
Modified: trunk/neo/tests/client/testConnectionPool.py
==============================================================================
--- trunk/neo/tests/client/testConnectionPool.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testConnectionPool.py [iso-8859-1] Wed Jan 5 18:52:25 2011
@@ -16,7 +16,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import unittest
-from mock import Mock
+from mock import Mock, ReturnValues
from neo.tests import NeoUnitTestBase
from neo.client.app import ConnectionPool
@@ -78,7 +78,7 @@ class ConnectionPoolTests(NeoUnitTestBas
self.assertRaises(NEOStorageError, pool.iterateForObject(oid).next)
def test_iterateForObject_connectionRefused(self):
- # connection refused
+ # connection refused at the first try
oid = self.getOID(1)
node = Mock({'__repr__': 'node'})
cell = Mock({'__repr__': 'cell', 'getNode': node})
@@ -86,11 +86,11 @@ class ConnectionPoolTests(NeoUnitTestBas
pt = Mock({'getCellListForOID': [cell]})
app = Mock({'getPartitionTable': pt})
pool = ConnectionPool(app)
- pool.getConnForNode = Mock({'__call__': None})
- self.assertRaises(StopIteration, pool.iterateForObject(oid).next)
+ pool.getConnForNode = Mock({'__call__': ReturnValues(None, conn)})
+ self.assertEqual(list(pool.iterateForObject(oid)), [(node, conn)])
- def test_iterateForObject_connectionRefused(self):
- # connection refused
+ def test_iterateForObject_connectionAccepted(self):
+ # connection accepted
oid = self.getOID(1)
node = Mock({'__repr__': 'node'})
cell = Mock({'__repr__': 'cell', 'getNode': node})
More information about the Neo-report
mailing list