[Neo-report] r1819 gregory - in /trunk/neo: ./ client/handlers/ master/ master/handlers/ t...
nobody at svn.erp5.org
nobody at svn.erp5.org
Mon Feb 22 12:42:43 CET 2010
Author: gregory
Date: Mon Feb 22 12:42:43 2010
New Revision: 1819
Log:
Use nm.getIdentifiedList() instead of em.getConnectionList()
Modified:
trunk/neo/client/handlers/master.py
trunk/neo/master/app.py
trunk/neo/master/handlers/administration.py
trunk/neo/master/handlers/client.py
trunk/neo/master/handlers/storage.py
trunk/neo/master/verification.py
trunk/neo/node.py
trunk/neo/tests/client/testMasterHandler.py
trunk/neo/tests/master/testClientHandler.py
trunk/neo/tests/master/testElectionHandler.py
trunk/neo/tests/master/testMasterApp.py
trunk/neo/tests/master/testStorageHandler.py
Modified: trunk/neo/client/handlers/master.py
==============================================================================
--- trunk/neo/client/handlers/master.py [iso-8859-1] (original)
+++ trunk/neo/client/handlers/master.py [iso-8859-1] Mon Feb 22 12:42:43 2010
@@ -148,9 +148,9 @@
for node_type, addr, uuid, state in node_list:
if state != NodeStates.RUNNING:
# close connection to this node if no longer running
- conn_list = self.app.em.getConnectionListByUUID(uuid)
- if conn_list:
- conn = conn_list[0]
+ node = self.app.nm.getByUUID(uuid)
+ if node and node.isConnected():
+ conn = node.getConnection()
conn.close()
if node_type == NodeTypes.STORAGE:
# Remove from pool connection
Modified: trunk/neo/master/app.py
==============================================================================
--- trunk/neo/master/app.py [iso-8859-1] (original)
+++ trunk/neo/master/app.py [iso-8859-1] Mon Feb 22 12:42:43 2010
@@ -264,13 +264,10 @@
assign_for_notification(NodeTypes.ADMIN)
# send at most one non-empty notification packet per node
- for conn in self.em.getConnectionList():
- if conn.getUUID() is None:
- continue
- node = self.nm.getByUUID(conn.getUUID())
+ for node in self.nm.getIdentifiedList():
node_list = node_dict.get(node.getType(), [])
if node_list:
- conn.notify(Packets.NotifyNodeInformation(node_list))
+ node.notify(Packets.NotifyNodeInformation(node_list))
def broadcastPartitionChanges(self, cell_list):
"""Broadcast a Notify Partition Changes packet."""
@@ -279,12 +276,9 @@
return
ptid = self.pt.setNextID()
self.pt.log()
- for c in self.em.getConnectionList():
- n = self.nm.getByUUID(c.getUUID())
- if n is None:
- continue
- if n.isClient() or n.isStorage() or n.isAdmin():
- c.notify(Packets.NotifyPartitionChanges(ptid, cell_list))
+ for node in self.nm.getIdentifiedList():
+ if node.isClient() or node.isStorage() or node.isAdmin():
+ node.notify(Packets.NotifyPartitionChanges(ptid, cell_list))
def outdateAndBroadcastPartition(self):
" Outdate cell of non-working nodes and broadcast changes """
@@ -319,10 +313,9 @@
def broadcastLastOID(self, oid):
logging.debug('Broadcast last OID to storages : %s' % dump(oid))
packet = Packets.NotifyLastOID(oid)
- for conn in self.em.getConnectionList():
- node = self.nm.getByUUID(conn.getUUID())
- if node is not None and node.isStorage():
- conn.notify(packet)
+ for node in self.nm.getIdentifiedList():
+ if node.isStorage():
+ node.notify(packet)
def provideService(self):
"""
@@ -345,13 +338,11 @@
# If not operational, send Stop Operation packets to storage
# nodes and client nodes. Abort connections to client nodes.
logging.critical('No longer operational, stopping the service')
- for conn in em.getConnectionList():
- node = nm.getByUUID(conn.getUUID())
- if node is not None and (node.isStorage()
- or node.isClient()):
- conn.notify(Packets.StopOperation())
+ for node in self.nm.getIdentifiedList():
+ if node.isStorage() or node.isClient():
+ node.notify(Packets.StopOperation())
if node.isClient():
- conn.abort()
+ node.getConnection().abort()
# Then, go back, and restart.
return
@@ -457,16 +448,13 @@
# change handlers
notification_packet = Packets.NotifyClusterInformation(state)
- for conn in em.getConnectionList():
- node = nm.getByUUID(conn.getUUID())
- if conn.isListening() or node is None:
- # not identified or listening, keep the identification handler
- continue
+ for node in self.nm.getIdentifiedList():
if not node.isMaster():
- conn.notify(notification_packet)
+ node.notify(notification_packet)
if node.isAdmin() or node.isMaster():
# those node types keep their own handler
continue
+ conn = node.getConnection()
if node.isClient():
if state != ClusterStates.RUNNING:
conn.close()
@@ -514,8 +502,8 @@
# corrected.
# change handler
handler = shutdown.ShutdownHandler(self)
- for c in self.em.getConnectionList():
- c.setHandler(handler)
+ for node in self.nm.getIdentifiedList():
+ node.getConnection().setHandler(handler)
# wait for all transaction to be finished
while True:
@@ -525,15 +513,12 @@
if self.cluster_state == ClusterStates.RUNNING:
sys.exit("Application has been asked to shut down")
logging.info("asking all nodes to shutdown")
- for c in self.em.getConnectionList():
- node = self.nm.getByUUID(c.getUUID())
- if node is None:
- continue
+ for node in self.nm.getIdentifiedList():
notification = Packets.NotifyNodeInformation([node.asTuple()])
if node.isClient():
- c.notify(notification)
+ node.notify(notification)
elif node.isStorage() or node.isMaster():
- c.notify(notification)
+ node.notify(notification)
# then shutdown
sys.exit("Cluster has been asked to shut down")
Modified: trunk/neo/master/handlers/administration.py
==============================================================================
--- trunk/neo/master/handlers/administration.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/administration.py [iso-8859-1] Mon Feb 22 12:42:43 2010
@@ -86,24 +86,18 @@
if state == NodeStates.RUNNING:
# first make sure to have a connection to the node
- node_conn = None
- for node_conn in app.em.getConnectionList():
- if node_conn.getUUID() == node.getUUID():
- break
- else:
- # no connection to the node
+ if not node.isConnected():
raise ProtocolError('no connection to the node')
node.setState(state)
elif state == NodeStates.DOWN and node.isStorage():
# update it's state
node.setState(state)
- for storage_conn in app.em.getConnectionListByUUID(uuid):
+ if node.isConnected():
# notify itself so it can shutdown
- node_list = [node.asTuple()]
- storage_conn.notify(Packets.NotifyNodeInformation(node_list))
+ node.notify(Packets.NotifyNodeInformation([node.asTuple()]))
# close to avoid handle the closure as a connection lost
- storage_conn.abort()
+ node.getConnection().abort()
# modify the partition table if required
cell_list = []
if modify_partition_table:
@@ -152,10 +146,9 @@
node.setRunning()
app.broadcastNodesInformation(node_list)
# start nodes
- for s_conn in em.getConnectionList():
- if s_conn.getUUID() in uuid_set:
- s_conn.notify(Packets.NotifyLastOID(app.loid))
- s_conn.notify(Packets.StartOperation())
+ for node in self.app.nm.getIdentifiedList(pool_set=uuid_set):
+ node.notify(Packets.NotifyLastOID(app.loid))
+ node.notify(Packets.StartOperation())
# broadcast the new partition table
app.broadcastPartitionChanges(cell_list)
p = Errors.Ack('node added')
Modified: trunk/neo/master/handlers/client.py
==============================================================================
--- trunk/neo/master/handlers/client.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/client.py [iso-8859-1] Mon Feb 22 12:42:43 2010
@@ -76,10 +76,9 @@
# build a new set as we may not send the message to all nodes as some
# might be not reachable at that time
used_uuid_set = set()
- for c in app.em.getConnectionList():
- if c.getUUID() in uuid_set:
- c.ask(Packets.AskLockInformation(tid), timeout=60)
- used_uuid_set.add(c.getUUID())
+ for node in self.app.nm.getIdentifiedList(pool_set=uuid_set):
+ node.ask(Packets.AskLockInformation(tid), timeout=60)
+ used_uuid_set.add(node.getUUID())
app.tm.prepare(tid, oid_list, used_uuid_set, conn.getPeerId())
Modified: trunk/neo/master/handlers/storage.py
==============================================================================
--- trunk/neo/master/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/storage.py [iso-8859-1] Mon Feb 22 12:42:43 2010
@@ -77,19 +77,16 @@
# Transaction Finished to the initiated client node,
# Invalidate Objects to the other client nodes, and Unlock
# Information to relevant storage nodes.
- for c in app.em.getConnectionList():
- uuid = c.getUUID()
- if uuid is not None:
- node = app.nm.getByUUID(uuid)
- if node.isClient():
- if node is t.getNode():
- p = Packets.AnswerTransactionFinished(tid)
- c.answer(p, msg_id=t.getMessageId())
- else:
- c.notify(Packets.InvalidateObjects(t.getOIDList(), tid))
- elif node.isStorage():
- if uuid in t.getUUIDList():
- c.notify(Packets.NotifyUnlockInformation(tid))
+ for node in self.app.nm.getIdentifiedList():
+ if node.isClient():
+ if node is t.getNode():
+ p = Packets.AnswerTransactionFinished(tid)
+ node.answer(p, msg_id=t.getMessageId())
+ else:
+ node.notify(Packets.InvalidateObjects(t.getOIDList(), tid))
+ elif node.isStorage():
+ if uuid in t.getUUIDList():
+ node.notify(Packets.NotifyUnlockInformation(tid))
# remove transaction from manager
tm.remove(tid)
Modified: trunk/neo/master/verification.py
==============================================================================
--- trunk/neo/master/verification.py [iso-8859-1] (original)
+++ trunk/neo/master/verification.py [iso-8859-1] Mon Feb 22 12:42:43 2010
@@ -93,13 +93,10 @@
logging.info('start to verify data')
# Gather all unfinished transactions.
- for conn in em.getConnectionList():
- uuid = conn.getUUID()
- if uuid is not None:
- node = nm.getByUUID(uuid)
- if node.isStorage():
- self._uuid_dict[uuid] = False
- conn.ask(Packets.AskUnfinishedTransactions())
+ for node in self.app.nm.getIdentifiedList():
+ if node.isStorage():
+ self._uuid_dict[node.getUUID()] = False
+ node.ask(Packets.AskUnfinishedTransactions())
while True:
em.poll(1)
@@ -116,40 +113,34 @@
uuid_set = self.verifyTransaction(tid)
if uuid_set is None:
# Make sure that no node has this transaction.
- for conn in em.getConnectionList():
- uuid = conn.getUUID()
- if uuid is not None:
- node = nm.getByUUID(uuid)
- if node.isStorage():
- conn.notify(Packets.DeleteTransaction(tid))
+ for node in self.app.nm.getIdentifiedList():
+ if node.isStorage():
+ node.notify(Packets.DeleteTransaction(tid))
else:
- for conn in em.getConnectionList():
- uuid = conn.getUUID()
- if uuid in uuid_set:
- conn.ask(Packets.CommitTransaction(tid))
+ for node in self.app.nm.getIdentifiedList(pool_set=uuid_set):
+ node.ask(Packets.CommitTransaction(tid))
# If possible, send the packets now.
em.poll(0)
def verifyTransaction(self, tid):
em = self.app.em
+ nm = self.app.nm
uuid_set = set()
# Determine to which nodes I should ask.
partition = self.app.pt.getPartition(tid)
- transaction_uuid_list = [cell.getUUID() for cell \
+ uuid_list = [cell.getUUID() for cell \
in self.app.pt.getCellList(partition, readable=True)]
- if len(transaction_uuid_list) == 0:
+ if len(uuid_list) == 0:
raise VerificationFailure
- uuid_set.update(transaction_uuid_list)
+ uuid_set.update(uuid_list)
# Gather OIDs.
self._uuid_dict = {}
- for conn in em.getConnectionList():
- uuid = conn.getUUID()
- if uuid in transaction_uuid_list:
- self._uuid_dict[uuid] = False
- conn.ask(Packets.AskTransactionInformation(tid))
+ for node in self.app.nm.getIdentifiedList(pool_set=uuid_list):
+ self._uuid_dict[node.getUUID()] = False
+ node.ask(Packets.AskTransactionInformation(tid))
if len(self._uuid_dict) == 0:
raise VerificationFailure
@@ -174,11 +165,9 @@
uuid_set.update(object_uuid_list)
self._object_present = True
- for conn in em.getConnectionList():
- uuid = conn.getUUID()
- if uuid in object_uuid_list:
- self._uuid_dict[uuid] = False
- conn.ask(Packets.AskObjectPresent(oid, tid))
+ for node in nm.getIdentifiedList(pool_set=object_uuid_list):
+ self._uuid_dict[node.getUUID()] = False
+ node.ask(Packets.AskObjectPresent(oid, tid))
while True:
em.poll(1)
Modified: trunk/neo/node.py
==============================================================================
--- trunk/neo/node.py [iso-8859-1] (original)
+++ trunk/neo/node.py [iso-8859-1] Mon Feb 22 12:42:43 2010
@@ -224,6 +224,10 @@
class NodeManager(object):
"""This class manages node status."""
+
+ # TODO: rework getXXXList() methods, filter first by node type
+ # - getStorageList(identified=True, connected=True, )
+ # - getList(...)
def __init__(self):
self._node_set = set()
Modified: trunk/neo/tests/client/testMasterHandler.py
==============================================================================
--- trunk/neo/tests/client/testMasterHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testMasterHandler.py [iso-8859-1] Mon Feb 22 12:42:43 2010
@@ -185,9 +185,19 @@
(NodeTypes.CLIENT, addr, self.getNewUUID(), NodeStates.UNKNOWN),
(NodeTypes.STORAGE, addr, self.getNewUUID(), NodeStates.DOWN),
]
+ # XXX: it might be better to test with real node & node manager
conn1, conn2 = Mock({'__repr__': 'conn1'}), Mock({'__repr__': 'conn2'})
- self.app.em = Mock({'getConnectionListByUUID': ReturnValues([conn1],
- [conn2])})
+ node1 = Mock({
+ 'getConnection': conn1,
+ '__nonzero__': 1,
+ 'isConnected': True,
+ })
+ node2 = Mock({
+ 'getConnection': conn2,
+ '__nonzero__': 1,
+ 'isConnected': True,
+ })
+ self.app.nm = Mock({'getByUUID': ReturnValues(node1, node2)})
self.app.cp = Mock()
self.handler.notifyNodeInformation(conn, node_list)
# node manager updated
Modified: trunk/neo/tests/master/testClientHandler.py
==============================================================================
--- trunk/neo/tests/master/testClientHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testClientHandler.py [iso-8859-1] Mon Feb 22 12:42:43 2010
@@ -31,7 +31,7 @@
self.app = Application(config)
self.app.pt.clear()
self.app.pt.setID(pack('!Q', 1))
- self.app.em = Mock({"getConnectionList" : []})
+ self.app.em = Mock()
self.app.loid = '\0' * 8
self.app.tm.setLastTID('\0' * 8)
self.service = ClientServiceHandler(self.app)
@@ -117,7 +117,7 @@
oid_list = []
tid = self.app.tm.getLastTID()
conn = self.getFakeConnection(client_uuid, self.client_address)
- self.app.em = Mock({"getConnectionList" : [conn, storage_conn]})
+ self.app.nm.getByUUID(storage_uuid).setConnection(storage_conn)
service.askFinishTransaction(conn, oid_list, tid)
self.checkAskLockInformation(storage_conn)
self.assertEquals(len(self.app.tm.getPendingList()), 1)
Modified: trunk/neo/tests/master/testElectionHandler.py
==============================================================================
--- trunk/neo/tests/master/testElectionHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testElectionHandler.py [iso-8859-1] Mon Feb 22 12:42:43 2010
@@ -43,7 +43,7 @@
config = self.getMasterConfiguration(master_number=1)
self.app = Application(config)
self.app.pt.clear()
- self.app.em = Mock({"getConnectionList" : []})
+ self.app.em = Mock()
self.app.uuid = self._makeUUID('M')
self.app.server = ('127.0.0.1', 10000)
self.app.name = 'NEOCLUSTER'
@@ -209,7 +209,7 @@
self.app = Application(config)
self.app.pt.clear()
self.app.name = 'NEOCLUSTER'
- self.app.em = Mock({"getConnectionList" : []})
+ self.app.em = Mock()
self.election = ServerElectionHandler(self.app)
self.app.unconnected_master_node_set = set()
self.app.negotiating_master_node_set = set()
Modified: trunk/neo/tests/master/testMasterApp.py
==============================================================================
--- trunk/neo/tests/master/testMasterApp.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testMasterApp.py [iso-8859-1] Mon Feb 22 12:42:43 2010
@@ -50,18 +50,20 @@
def test_06_broadcastNodeInformation(self):
# defined some nodes to which data will be send
master_uuid = self.getNewUUID()
- self.app.nm.createMaster(uuid=master_uuid)
+ master = self.app.nm.createMaster(uuid=master_uuid)
storage_uuid = self.getNewUUID()
storage = self.app.nm.createStorage(uuid=storage_uuid)
client_uuid = self.getNewUUID()
client = self.app.nm.createClient(uuid=client_uuid)
+ # create conn and patch em
+ master_conn = Mock()
+ storage_conn = Mock()
+ client_conn = Mock()
+ master.setConnection(master_conn)
+ storage.setConnection(storage_conn)
+ client.setConnection(client_conn)
self.app.nm.add(storage)
self.app.nm.add(client)
- # create conn and patch em
- master_conn = Mock({"getUUID" : master_uuid})
- storage_conn = Mock({"getUUID" : storage_uuid})
- client_conn = Mock({"getUUID" : client_uuid})
- self.app.em = Mock({"getConnectionList" : (master_conn, storage_conn, client_conn)})
# no address defined, not send to client node
c_node = self.app.nm.createClient(uuid = self.getNewUUID())
@@ -72,10 +74,6 @@
self.checkNotifyNodeInformation(storage_conn)
# address defined and client type
- master_conn = Mock({"getUUID" : master_uuid})
- storage_conn = Mock({"getUUID" : storage_uuid})
- client_conn = Mock({"getUUID" : client_uuid})
- self.app.em = Mock({"getConnectionList" : (master_conn, storage_conn, client_conn)})
s_node = self.app.nm.createClient(
uuid = self.getNewUUID(),
address=("127.1.0.1", 3361)
@@ -87,10 +85,6 @@
self.checkNotifyNodeInformation(storage_conn)
# address defined and storage type
- master_conn = Mock({"getUUID" : master_uuid})
- storage_conn = Mock({"getUUID" : storage_uuid})
- client_conn = Mock({"getUUID" : client_uuid})
- self.app.em = Mock({"getConnectionList" : (master_conn, storage_conn, client_conn)})
s_node = self.app.nm.createStorage(
uuid=self.getNewUUID(),
address=("127.0.0.1", 1351)
Modified: trunk/neo/tests/master/testStorageHandler.py
==============================================================================
--- trunk/neo/tests/master/testStorageHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testStorageHandler.py [iso-8859-1] Mon Feb 22 12:42:43 2010
@@ -34,7 +34,7 @@
self.app = Application(config)
self.app.pt.clear()
self.app.pt.setID(pack('!Q', 1))
- self.app.em = Mock({"getConnectionList" : []})
+ self.app.em = Mock()
self.service = StorageServiceHandler(self.app)
self.client_handler = ClientServiceHandler(self.app)
# define some variable to simulate client and storage node
@@ -72,6 +72,7 @@
node = nm.createFromNodeType(node_type, address=(ip, port),
uuid=uuid)
conn = self.getFakeConnection(node.getUUID(),node.getAddress())
+ node.setConnection(conn)
return (node, conn)
def test_answerInformationLocked_1(self):
@@ -106,7 +107,6 @@
# a faked event manager
connection_list = [client_conn_1, client_conn_2, storage_conn_1,
storage_conn_2]
- self.app.em = Mock({"getConnectionList" : connection_list})
# register a transaction
tid = self.app.tm.begin(client_1, None)
self.app.tm.prepare(tid, oid_list, uuid_list, msg_id)
More information about the Neo-report
mailing list