[Neo-report] r2173 vincent - in /trunk/neo: client/ tests/client/
nobody at svn.erp5.org
nobody at svn.erp5.org
Thu Jun 17 17:57:51 CEST 2010
Author: vincent
Date: Thu Jun 17 17:57:49 2010
New Revision: 2173
Log:
Sort cell list after randomising it.
There are 2 objectives:
- Prevent randomly trying to connect to an unresponsive storage node, which
impairs performances a lot. Note that this happens only when the master
didn't notice the disconnection, so the node is still in running state in
the node manager.
- Increase connection reuse, saving the cost of establishing a new
connection and a slot in connection pool.
Randomisation should be kept to even out storage node use.
Modified:
trunk/neo/client/app.py
trunk/neo/client/pool.py
trunk/neo/tests/client/testClientApp.py
trunk/neo/tests/client/testConnectionPool.py
Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Thu Jun 17 17:57:49 2010
@@ -447,6 +447,7 @@ class Application(object):
raise NEOStorageNotFoundError()
shuffle(cell_list)
+ cell_list.sort(key=self.cp.getCellSortKey)
self.local_var.asked_object = 0
for cell in cell_list:
logging.debug('trying to load %s from %s',
@@ -859,6 +860,7 @@ class Application(object):
assert len(cell_list), 'No cell found for transaction %s' % (
dump(undone_tid), )
shuffle(cell_list)
+ cell_list.sort(key=self.cp.getCellSortKey)
for cell in cell_list:
conn = self.cp.getConnForCell(cell)
if conn is None:
@@ -976,6 +978,7 @@ class Application(object):
for tid in ordered_tids:
cell_list = self._getCellListForTID(tid, readable=True)
shuffle(cell_list)
+ cell_list.sort(key=self.cp.getCellSortKey)
for cell in cell_list:
conn = self.cp.getConnForCell(cell)
if conn is not None:
@@ -1021,7 +1024,7 @@ class Application(object):
# Get history informations for object first
cell_list = self._getCellListForOID(oid, readable=True)
shuffle(cell_list)
-
+ cell_list.sort(key=self.cp.getCellSortKey)
for cell in cell_list:
conn = self.cp.getConnForCell(cell)
if conn is None:
@@ -1060,7 +1063,7 @@ class Application(object):
for serial, size in self.local_var.history[1]:
self._getCellListForTID(serial, readable=True)
shuffle(cell_list)
-
+ cell_list.sort(key=self.cp.getCellSortKey)
for cell in cell_list:
conn = self.cp.getConnForCell(cell)
if conn is None:
Modified: trunk/neo/client/pool.py
==============================================================================
--- trunk/neo/client/pool.py [iso-8859-1] (original)
+++ trunk/neo/client/pool.py [iso-8859-1] Thu Jun 17 17:57:49 2010
@@ -22,6 +22,19 @@ from neo.protocol import NodeTypes, Pack
from neo.connection import MTClientConnection
from neo.client.exception import ConnectionClosed
from neo.profiling import profiler_decorator
+import time
+
+# How long before we might retry a connection to a node to which connection
+# failed in the past.
+MAX_FAILURE_AGE = 600
+
+# Cell list sort keys
+# We are connected to storage node hosting cell, high priority
+CELL_CONNECTED = -1
+# normal priority
+CELL_GOOD = 0
+# Storage node hosting cell failed recently, low priority
+CELL_FAILED = 1
class ConnectionPool(object):
"""This class manages a pool of connections to storage nodes."""
@@ -36,6 +49,7 @@ class ConnectionPool(object):
l = RLock()
self.connection_lock_acquire = l.acquire
self.connection_lock_release = l.release
+ self.node_failure_dict = {}
@profiler_decorator
def _initNodeConnection(self, node):
@@ -59,6 +73,7 @@ class ConnectionPool(object):
if conn.getConnector() is None:
# This happens, if a connection could not be established.
logging.error('Connection to %r failed', node)
+ self.notifyFailure(node)
return None
p = Packets.RequestIdentification(NodeTypes.CLIENT,
@@ -72,6 +87,7 @@ class ConnectionPool(object):
handler=app.storage_bootstrap_handler)
except ConnectionClosed:
logging.error('Connection to %r failed', node)
+ self.notifyFailure(node)
return None
if app.isNodeReady():
@@ -79,6 +95,7 @@ class ConnectionPool(object):
return conn
else:
logging.info('%r not ready', node)
+ self.notifyFailure(node)
return None
@profiler_decorator
@@ -112,6 +129,28 @@ class ConnectionPool(object):
return conn
@profiler_decorator
+ def notifyFailure(self, node):
+ self._notifyFailure(node.getUUID(), time.time() + MAX_FAILURE_AGE)
+
+ def _notifyFailure(self, uuid, at):
+ self.node_failure_dict[uuid] = at
+
+ @profiler_decorator
+ def getCellSortKey(self, cell):
+ return self._getCellSortKey(cell.getUUID(), time.time())
+
+ def _getCellSortKey(self, uuid, now):
+ if uuid in self.connection_dict:
+ result = CELL_CONNECTED
+ else:
+ failure = self.node_failure_dict.get(uuid)
+ if failure is None or failure < now:
+ result = CELL_GOOD
+ else:
+ result = CELL_FAILED
+ return result
+
+ @profiler_decorator
def getConnForCell(self, cell):
return self.getConnForNode(cell.getNode())
Modified: trunk/neo/tests/client/testClientApp.py
==============================================================================
--- trunk/neo/tests/client/testClientApp.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testClientApp.py [iso-8859-1] Thu Jun 17 17:57:49 2010
@@ -195,21 +195,21 @@ class ClientApplicationTests(NeoTestBase
tid = self.makeTID()
# cache cleared
self.assertTrue(oid not in mq)
- app.pt = Mock({ 'getCellListForOID': (), })
+ app.pt = Mock({ 'getCellListForOID': [], })
app.local_var.history = (oid, [(tid, 0)])
# If object len is 0, this object doesn't exist anymore because its
# creation has been undone.
self.assertRaises(KeyError, app.getSerial, oid)
self.assertEquals(len(app.pt.mockGetNamedCalls('getCellListForOID')), 1)
# Otherwise, result from ZODB
- app.pt = Mock({ 'getCellListForOID': (), })
+ app.pt = Mock({ 'getCellListForOID': [], })
app.local_var.history = (oid, [(tid, 1)])
self.assertEquals(app.getSerial(oid), tid)
self.assertEquals(len(app.pt.mockGetNamedCalls('getCellListForOID')), 1)
# fill the cache -> hit
mq.store(oid, (tid, ' '))
self.assertTrue(oid in mq)
- app.pt = Mock({ 'getCellListForOID': (), })
+ app.pt = Mock({ 'getCellListForOID': [], })
app.getSerial(oid)
self.assertEquals(app.getSerial(oid), tid)
self.assertEquals(len(app.pt.mockGetNamedCalls('getCellListForOID')), 0)
@@ -231,7 +231,7 @@ class ClientApplicationTests(NeoTestBase
'fakeReceived': packet,
})
app.local_var.queue = Mock({'get' : (conn, None)})
- app.pt = Mock({ 'getCellListForOID': (cell, ), })
+ app.pt = Mock({ 'getCellListForOID': [cell, ], })
app.cp = Mock({ 'getConnForCell' : conn})
app.local_var.asked_object = -1
Application._waitMessage = self._waitMessage
@@ -247,7 +247,7 @@ class ClientApplicationTests(NeoTestBase
'getAddress': ('127.0.0.1', 0),
'fakeReceived': packet,
})
- app.pt = Mock({ 'getCellListForOID': (cell, ), })
+ app.pt = Mock({ 'getCellListForOID': [cell, ], })
app.cp = Mock({ 'getConnForCell' : conn})
app.local_var.asked_object = -1
self.assertRaises(NEOStorageNotFoundError, app.load, oid)
@@ -289,7 +289,7 @@ class ClientApplicationTests(NeoTestBase
'getAddress': ('127.0.0.1', 0),
'fakeReceived': packet,
})
- app.pt = Mock({ 'getCellListForOID': (cell, ), })
+ app.pt = Mock({ 'getCellListForOID': [cell, ], })
app.cp = Mock({ 'getConnForCell' : conn})
app.local_var.asked_object = -1
self.assertRaises(NEOStorageNotFoundError, app.loadSerial, oid, tid2)
@@ -329,7 +329,7 @@ class ClientApplicationTests(NeoTestBase
'getAddress': ('127.0.0.1', 0),
'fakeReceived': packet,
})
- app.pt = Mock({ 'getCellListForOID': (cell, ), })
+ app.pt = Mock({ 'getCellListForOID': [cell, ], })
app.cp = Mock({ 'getConnForCell' : conn})
app.local_var.asked_object = -1
self.assertRaises(NEOStorageNotFoundError, app.loadBefore, oid, tid2)
@@ -772,8 +772,8 @@ class ClientApplicationTests(NeoTestBase
'getState': 'FakeState',
})
app.pt = Mock({
- 'getCellListForTID': (cell, ),
- 'getCellListForOID': (cell, ),
+ 'getCellListForTID': [cell, ],
+ 'getCellListForOID': [cell, ],
})
app.cp = Mock({'getConnForCell': conn, 'getConnForNode': conn})
def tryToResolveConflict(oid, conflict_serial, serial, data,
Modified: trunk/neo/tests/client/testConnectionPool.py
==============================================================================
--- trunk/neo/tests/client/testConnectionPool.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testConnectionPool.py [iso-8859-1] Thu Jun 17 17:57:49 2010
@@ -48,6 +48,26 @@ class ConnectionPoolTests(NeoTestBase):
# TODO: test getConnForNode (requires splitting complex functionalities)
+ def test_CellSortKey(self):
+ pool = ConnectionPool(None)
+ node_uuid_1 = self.getNewUUID()
+ node_uuid_2 = self.getNewUUID()
+ node_uuid_3 = self.getNewUUID()
+ # We are connected to node 1
+ pool.connection_dict[node_uuid_1] = None
+ # A connection to node 3 failed, will be forgotten at 5
+ pool._notifyFailure(node_uuid_3, 5)
+ getCellSortKey = pool._getCellSortKey
+
+ # At 0, key values are not ambiguous
+ self.assertTrue(getCellSortKey(node_uuid_1, 0) < getCellSortKey(
+ node_uuid_2, 0) < getCellSortKey(node_uuid_3, 0))
+ # At 10, nodes 2 and 3 have the same key value
+ self.assertTrue(getCellSortKey(node_uuid_1, 10) < getCellSortKey(
+ node_uuid_2, 10))
+ self.assertEqual(getCellSortKey(node_uuid_2, 10), getCellSortKey(
+ node_uuid_3, 10))
+
if __name__ == '__main__':
unittest.main()
More information about the Neo-report
mailing list