[Neo-report] r1819 gregory - in /trunk/neo: ./ client/handlers/ master/ master/handlers/ t...

nobody at svn.erp5.org nobody at svn.erp5.org
Mon Feb 22 12:42:43 CET 2010


Author: gregory
Date: Mon Feb 22 12:42:43 2010
New Revision: 1819

Log:
Use nm.getIdentifiedList() instead of em.getConnectionList()

Modified:
    trunk/neo/client/handlers/master.py
    trunk/neo/master/app.py
    trunk/neo/master/handlers/administration.py
    trunk/neo/master/handlers/client.py
    trunk/neo/master/handlers/storage.py
    trunk/neo/master/verification.py
    trunk/neo/node.py
    trunk/neo/tests/client/testMasterHandler.py
    trunk/neo/tests/master/testClientHandler.py
    trunk/neo/tests/master/testElectionHandler.py
    trunk/neo/tests/master/testMasterApp.py
    trunk/neo/tests/master/testStorageHandler.py

Modified: trunk/neo/client/handlers/master.py
==============================================================================
--- trunk/neo/client/handlers/master.py [iso-8859-1] (original)
+++ trunk/neo/client/handlers/master.py [iso-8859-1] Mon Feb 22 12:42:43 2010
@@ -148,9 +148,9 @@
         for node_type, addr, uuid, state in node_list:
             if state != NodeStates.RUNNING:
                 # close connection to this node if no longer running
-                conn_list = self.app.em.getConnectionListByUUID(uuid)
-                if conn_list:
-                    conn = conn_list[0]
+                node = self.app.nm.getByUUID(uuid)
+                if node and node.isConnected():
+                    conn = node.getConnection()
                     conn.close()
                     if node_type == NodeTypes.STORAGE:
                         # Remove from pool connection

Modified: trunk/neo/master/app.py
==============================================================================
--- trunk/neo/master/app.py [iso-8859-1] (original)
+++ trunk/neo/master/app.py [iso-8859-1] Mon Feb 22 12:42:43 2010
@@ -264,13 +264,10 @@
                 assign_for_notification(NodeTypes.ADMIN)
 
         # send at most one non-empty notification packet per node
-        for conn in self.em.getConnectionList():
-            if conn.getUUID() is None:
-                continue
-            node = self.nm.getByUUID(conn.getUUID())
+        for node in self.nm.getIdentifiedList():
             node_list = node_dict.get(node.getType(), [])
             if node_list:
-                conn.notify(Packets.NotifyNodeInformation(node_list))
+                node.notify(Packets.NotifyNodeInformation(node_list))
 
     def broadcastPartitionChanges(self, cell_list):
         """Broadcast a Notify Partition Changes packet."""
@@ -279,12 +276,9 @@
             return
         ptid = self.pt.setNextID()
         self.pt.log()
-        for c in self.em.getConnectionList():
-            n = self.nm.getByUUID(c.getUUID())
-            if n is None:
-                continue
-            if n.isClient() or n.isStorage() or n.isAdmin():
-                c.notify(Packets.NotifyPartitionChanges(ptid, cell_list))
+        for node in self.nm.getIdentifiedList():
+            if node.isClient() or node.isStorage() or node.isAdmin():
+                node.notify(Packets.NotifyPartitionChanges(ptid, cell_list))
 
     def outdateAndBroadcastPartition(self):
         " Outdate cell of non-working nodes and broadcast changes """
@@ -319,10 +313,9 @@
     def broadcastLastOID(self, oid):
         logging.debug('Broadcast last OID to storages : %s' % dump(oid))
         packet = Packets.NotifyLastOID(oid)
-        for conn in self.em.getConnectionList():
-            node = self.nm.getByUUID(conn.getUUID())
-            if node is not None and node.isStorage():
-                conn.notify(packet)
+        for node in self.nm.getIdentifiedList():
+            if node.isStorage():
+                node.notify(packet)
 
     def provideService(self):
         """
@@ -345,13 +338,11 @@
                 # If not operational, send Stop Operation packets to storage
                 # nodes and client nodes. Abort connections to client nodes.
                 logging.critical('No longer operational, stopping the service')
-                for conn in em.getConnectionList():
-                    node = nm.getByUUID(conn.getUUID())
-                    if node is not None and (node.isStorage()
-                            or node.isClient()):
-                        conn.notify(Packets.StopOperation())
+                for node in self.nm.getIdentifiedList():
+                    if node.isStorage() or node.isClient():
+                        node.notify(Packets.StopOperation())
                         if node.isClient():
-                            conn.abort()
+                            node.getConnection().abort()
 
                 # Then, go back, and restart.
                 return
@@ -457,16 +448,13 @@
 
         # change handlers
         notification_packet = Packets.NotifyClusterInformation(state)
-        for conn in em.getConnectionList():
-            node = nm.getByUUID(conn.getUUID())
-            if conn.isListening() or node is None:
-                # not identified or listening, keep the identification handler
-                continue
+        for node in self.nm.getIdentifiedList():
             if not node.isMaster():
-                conn.notify(notification_packet)
+                node.notify(notification_packet)
             if node.isAdmin() or node.isMaster():
                 # those node types keep their own handler
                 continue
+            conn = node.getConnection()
             if node.isClient():
                 if state != ClusterStates.RUNNING:
                     conn.close()
@@ -514,8 +502,8 @@
         #   corrected.
         # change handler
         handler = shutdown.ShutdownHandler(self)
-        for c in self.em.getConnectionList():
-            c.setHandler(handler)
+        for node in self.nm.getIdentifiedList():
+            node.getConnection().setHandler(handler)
 
         # wait for all transaction to be finished
         while True:
@@ -525,15 +513,12 @@
             if self.cluster_state == ClusterStates.RUNNING:
                 sys.exit("Application has been asked to shut down")
             logging.info("asking all nodes to shutdown")
-            for c in self.em.getConnectionList():
-                node = self.nm.getByUUID(c.getUUID())
-                if node is None:
-                    continue
+            for node in self.nm.getIdentifiedList():
                 notification = Packets.NotifyNodeInformation([node.asTuple()])
                 if node.isClient():
-                    c.notify(notification)
+                    node.notify(notification)
                 elif node.isStorage() or node.isMaster():
-                    c.notify(notification)
+                    node.notify(notification)
             # then shutdown
             sys.exit("Cluster has been asked to shut down")
 

Modified: trunk/neo/master/handlers/administration.py
==============================================================================
--- trunk/neo/master/handlers/administration.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/administration.py [iso-8859-1] Mon Feb 22 12:42:43 2010
@@ -86,24 +86,18 @@
 
         if state == NodeStates.RUNNING:
             # first make sure to have a connection to the node
-            node_conn = None
-            for node_conn in app.em.getConnectionList():
-                if node_conn.getUUID() == node.getUUID():
-                    break
-            else:
-                # no connection to the node
+            if not node.isConnected():
                 raise ProtocolError('no connection to the node')
             node.setState(state)
 
         elif state == NodeStates.DOWN and node.isStorage():
             # update it's state
             node.setState(state)
-            for storage_conn in app.em.getConnectionListByUUID(uuid):
+            if node.isConnected():
                 # notify itself so it can shutdown
-                node_list = [node.asTuple()]
-                storage_conn.notify(Packets.NotifyNodeInformation(node_list))
+                node.notify(Packets.NotifyNodeInformation([node.asTuple()]))
                 # close to avoid handle the closure as a connection lost
-                storage_conn.abort()
+                node.getConnection().abort()
             # modify the partition table if required
             cell_list = []
             if modify_partition_table:
@@ -152,10 +146,9 @@
             node.setRunning()
         app.broadcastNodesInformation(node_list)
         # start nodes
-        for s_conn in em.getConnectionList():
-            if s_conn.getUUID() in uuid_set:
-                s_conn.notify(Packets.NotifyLastOID(app.loid))
-                s_conn.notify(Packets.StartOperation())
+        for node in self.app.nm.getIdentifiedList(pool_set=uuid_set):
+                node.notify(Packets.NotifyLastOID(app.loid))
+                node.notify(Packets.StartOperation())
         # broadcast the new partition table
         app.broadcastPartitionChanges(cell_list)
         p = Errors.Ack('node added')

Modified: trunk/neo/master/handlers/client.py
==============================================================================
--- trunk/neo/master/handlers/client.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/client.py [iso-8859-1] Mon Feb 22 12:42:43 2010
@@ -76,10 +76,9 @@
         # build a new set as we may not send the message to all nodes as some
         # might be not reachable at that time
         used_uuid_set = set()
-        for c in app.em.getConnectionList():
-            if c.getUUID() in uuid_set:
-                c.ask(Packets.AskLockInformation(tid), timeout=60)
-                used_uuid_set.add(c.getUUID())
+        for node in self.app.nm.getIdentifiedList(pool_set=uuid_set):
+            node.ask(Packets.AskLockInformation(tid), timeout=60)
+            used_uuid_set.add(node.getUUID())
 
         app.tm.prepare(tid, oid_list, used_uuid_set, conn.getPeerId())
 

Modified: trunk/neo/master/handlers/storage.py
==============================================================================
--- trunk/neo/master/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/storage.py [iso-8859-1] Mon Feb 22 12:42:43 2010
@@ -77,19 +77,16 @@
         # Transaction Finished to the initiated client node,
         # Invalidate Objects to the other client nodes, and Unlock
         # Information to relevant storage nodes.
-        for c in app.em.getConnectionList():
-            uuid = c.getUUID()
-            if uuid is not None:
-                node = app.nm.getByUUID(uuid)
-                if node.isClient():
-                    if node is t.getNode():
-                        p = Packets.AnswerTransactionFinished(tid)
-                        c.answer(p, msg_id=t.getMessageId())
-                    else:
-                        c.notify(Packets.InvalidateObjects(t.getOIDList(), tid))
-                elif node.isStorage():
-                    if uuid in t.getUUIDList():
-                        c.notify(Packets.NotifyUnlockInformation(tid))
+        for node in self.app.nm.getIdentifiedList():
+            if node.isClient():
+                if node is t.getNode():
+                    p = Packets.AnswerTransactionFinished(tid)
+                    node.answer(p, msg_id=t.getMessageId())
+                else:
+                    node.notify(Packets.InvalidateObjects(t.getOIDList(), tid))
+            elif node.isStorage():
+                if uuid in t.getUUIDList():
+                    node.notify(Packets.NotifyUnlockInformation(tid))
 
         # remove transaction from manager
         tm.remove(tid)

Modified: trunk/neo/master/verification.py
==============================================================================
--- trunk/neo/master/verification.py [iso-8859-1] (original)
+++ trunk/neo/master/verification.py [iso-8859-1] Mon Feb 22 12:42:43 2010
@@ -93,13 +93,10 @@
         logging.info('start to verify data')
 
         # Gather all unfinished transactions.
-        for conn in em.getConnectionList():
-            uuid = conn.getUUID()
-            if uuid is not None:
-                node = nm.getByUUID(uuid)
-                if node.isStorage():
-                    self._uuid_dict[uuid] = False
-                    conn.ask(Packets.AskUnfinishedTransactions())
+        for node in self.app.nm.getIdentifiedList():
+            if node.isStorage():
+                self._uuid_dict[node.getUUID()] = False
+                node.ask(Packets.AskUnfinishedTransactions())
 
         while True:
             em.poll(1)
@@ -116,40 +113,34 @@
             uuid_set = self.verifyTransaction(tid)
             if uuid_set is None:
                 # Make sure that no node has this transaction.
-                for conn in em.getConnectionList():
-                    uuid = conn.getUUID()
-                    if uuid is not None:
-                        node = nm.getByUUID(uuid)
-                        if node.isStorage():
-                            conn.notify(Packets.DeleteTransaction(tid))
+                for node in self.app.nm.getIdentifiedList():
+                    if node.isStorage():
+                        node.notify(Packets.DeleteTransaction(tid))
             else:
-                for conn in em.getConnectionList():
-                    uuid = conn.getUUID()
-                    if uuid in uuid_set:
-                        conn.ask(Packets.CommitTransaction(tid))
+                for node in self.app.nm.getIdentifiedList(pool_set=uuid_set):
+                    node.ask(Packets.CommitTransaction(tid))
 
             # If possible, send the packets now.
             em.poll(0)
 
     def verifyTransaction(self, tid):
         em = self.app.em
+        nm = self.app.nm
         uuid_set = set()
 
         # Determine to which nodes I should ask.
         partition = self.app.pt.getPartition(tid)
-        transaction_uuid_list = [cell.getUUID() for cell \
+        uuid_list = [cell.getUUID() for cell \
                 in self.app.pt.getCellList(partition, readable=True)]
-        if len(transaction_uuid_list) == 0:
+        if len(uuid_list) == 0:
             raise VerificationFailure
-        uuid_set.update(transaction_uuid_list)
+        uuid_set.update(uuid_list)
 
         # Gather OIDs.
         self._uuid_dict = {}
-        for conn in em.getConnectionList():
-            uuid = conn.getUUID()
-            if uuid in transaction_uuid_list:
-                self._uuid_dict[uuid] = False
-                conn.ask(Packets.AskTransactionInformation(tid))
+        for node in self.app.nm.getIdentifiedList(pool_set=uuid_list):
+            self._uuid_dict[node.getUUID()] = False
+            node.ask(Packets.AskTransactionInformation(tid))
         if len(self._uuid_dict) == 0:
             raise VerificationFailure
 
@@ -174,11 +165,9 @@
             uuid_set.update(object_uuid_list)
 
             self._object_present = True
-            for conn in em.getConnectionList():
-                uuid = conn.getUUID()
-                if uuid in object_uuid_list:
-                    self._uuid_dict[uuid] = False
-                    conn.ask(Packets.AskObjectPresent(oid, tid))
+            for node in nm.getIdentifiedList(pool_set=object_uuid_list):
+                self._uuid_dict[node.getUUID()] = False
+                node.ask(Packets.AskObjectPresent(oid, tid))
 
             while True:
                 em.poll(1)

Modified: trunk/neo/node.py
==============================================================================
--- trunk/neo/node.py [iso-8859-1] (original)
+++ trunk/neo/node.py [iso-8859-1] Mon Feb 22 12:42:43 2010
@@ -224,6 +224,10 @@
 
 class NodeManager(object):
     """This class manages node status."""
+
+    # TODO: rework getXXXList() methods, filter first by node type
+    # - getStorageList(identified=True, connected=True, )
+    # - getList(...)
 
     def __init__(self):
         self._node_set = set()

Modified: trunk/neo/tests/client/testMasterHandler.py
==============================================================================
--- trunk/neo/tests/client/testMasterHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testMasterHandler.py [iso-8859-1] Mon Feb 22 12:42:43 2010
@@ -185,9 +185,19 @@
             (NodeTypes.CLIENT, addr, self.getNewUUID(), NodeStates.UNKNOWN),
             (NodeTypes.STORAGE, addr, self.getNewUUID(), NodeStates.DOWN),
         ]
+        # XXX: it might be better to test with real node & node manager
         conn1, conn2 = Mock({'__repr__': 'conn1'}), Mock({'__repr__': 'conn2'})
-        self.app.em = Mock({'getConnectionListByUUID': ReturnValues([conn1],
-                    [conn2])})
+        node1 = Mock({
+            'getConnection': conn1, 
+            '__nonzero__': 1,
+            'isConnected': True,
+        })
+        node2 = Mock({
+            'getConnection': conn2, 
+            '__nonzero__': 1,
+            'isConnected': True,
+        })
+        self.app.nm = Mock({'getByUUID': ReturnValues(node1, node2)})
         self.app.cp = Mock()
         self.handler.notifyNodeInformation(conn, node_list)
         # node manager updated

Modified: trunk/neo/tests/master/testClientHandler.py
==============================================================================
--- trunk/neo/tests/master/testClientHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testClientHandler.py [iso-8859-1] Mon Feb 22 12:42:43 2010
@@ -31,7 +31,7 @@
         self.app = Application(config)
         self.app.pt.clear()
         self.app.pt.setID(pack('!Q', 1))
-        self.app.em = Mock({"getConnectionList" : []})
+        self.app.em = Mock()
         self.app.loid = '\0' * 8
         self.app.tm.setLastTID('\0' * 8)
         self.service = ClientServiceHandler(self.app)
@@ -117,7 +117,7 @@
         oid_list = []
         tid = self.app.tm.getLastTID()
         conn = self.getFakeConnection(client_uuid, self.client_address)
-        self.app.em = Mock({"getConnectionList" : [conn, storage_conn]})
+        self.app.nm.getByUUID(storage_uuid).setConnection(storage_conn)
         service.askFinishTransaction(conn, oid_list, tid)
         self.checkAskLockInformation(storage_conn)
         self.assertEquals(len(self.app.tm.getPendingList()), 1)

Modified: trunk/neo/tests/master/testElectionHandler.py
==============================================================================
--- trunk/neo/tests/master/testElectionHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testElectionHandler.py [iso-8859-1] Mon Feb 22 12:42:43 2010
@@ -43,7 +43,7 @@
         config = self.getMasterConfiguration(master_number=1)
         self.app = Application(config)
         self.app.pt.clear()
-        self.app.em = Mock({"getConnectionList" : []})
+        self.app.em = Mock()
         self.app.uuid = self._makeUUID('M')
         self.app.server = ('127.0.0.1', 10000)
         self.app.name = 'NEOCLUSTER'
@@ -209,7 +209,7 @@
         self.app = Application(config)
         self.app.pt.clear()
         self.app.name = 'NEOCLUSTER'
-        self.app.em = Mock({"getConnectionList" : []})
+        self.app.em = Mock()
         self.election = ServerElectionHandler(self.app)
         self.app.unconnected_master_node_set = set()
         self.app.negotiating_master_node_set = set()

Modified: trunk/neo/tests/master/testMasterApp.py
==============================================================================
--- trunk/neo/tests/master/testMasterApp.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testMasterApp.py [iso-8859-1] Mon Feb 22 12:42:43 2010
@@ -50,18 +50,20 @@
     def test_06_broadcastNodeInformation(self):
         # defined some nodes to which data will be send
         master_uuid = self.getNewUUID()
-        self.app.nm.createMaster(uuid=master_uuid)
+        master = self.app.nm.createMaster(uuid=master_uuid)
         storage_uuid = self.getNewUUID()
         storage = self.app.nm.createStorage(uuid=storage_uuid)
         client_uuid = self.getNewUUID()
         client = self.app.nm.createClient(uuid=client_uuid)
+        # create conn and patch em
+        master_conn = Mock()
+        storage_conn = Mock()
+        client_conn = Mock()
+        master.setConnection(master_conn)
+        storage.setConnection(storage_conn)
+        client.setConnection(client_conn)
         self.app.nm.add(storage)
         self.app.nm.add(client)
-        # create conn and patch em
-        master_conn = Mock({"getUUID" : master_uuid})
-        storage_conn = Mock({"getUUID" : storage_uuid})
-        client_conn = Mock({"getUUID" : client_uuid})
-        self.app.em = Mock({"getConnectionList" : (master_conn, storage_conn, client_conn)})
 
         # no address defined, not send to client node
         c_node = self.app.nm.createClient(uuid = self.getNewUUID())
@@ -72,10 +74,6 @@
         self.checkNotifyNodeInformation(storage_conn)
 
         # address defined and client type
-        master_conn = Mock({"getUUID" : master_uuid})
-        storage_conn = Mock({"getUUID" : storage_uuid})
-        client_conn = Mock({"getUUID" : client_uuid})
-        self.app.em = Mock({"getConnectionList" : (master_conn, storage_conn, client_conn)})
         s_node = self.app.nm.createClient(
             uuid = self.getNewUUID(),
             address=("127.1.0.1", 3361)
@@ -87,10 +85,6 @@
         self.checkNotifyNodeInformation(storage_conn)
 
         # address defined and storage type
-        master_conn = Mock({"getUUID" : master_uuid})
-        storage_conn = Mock({"getUUID" : storage_uuid})
-        client_conn = Mock({"getUUID" : client_uuid})
-        self.app.em = Mock({"getConnectionList" : (master_conn, storage_conn, client_conn)})
         s_node = self.app.nm.createStorage(
             uuid=self.getNewUUID(),
             address=("127.0.0.1", 1351)

Modified: trunk/neo/tests/master/testStorageHandler.py
==============================================================================
--- trunk/neo/tests/master/testStorageHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testStorageHandler.py [iso-8859-1] Mon Feb 22 12:42:43 2010
@@ -34,7 +34,7 @@
         self.app = Application(config)
         self.app.pt.clear()
         self.app.pt.setID(pack('!Q', 1))
-        self.app.em = Mock({"getConnectionList" : []})
+        self.app.em = Mock()
         self.service = StorageServiceHandler(self.app)
         self.client_handler = ClientServiceHandler(self.app)
         # define some variable to simulate client and storage node
@@ -72,6 +72,7 @@
         node = nm.createFromNodeType(node_type, address=(ip, port),
                 uuid=uuid)
         conn = self.getFakeConnection(node.getUUID(),node.getAddress())
+        node.setConnection(conn)
         return (node, conn)
 
     def test_answerInformationLocked_1(self):
@@ -106,7 +107,6 @@
         # a faked event manager
         connection_list = [client_conn_1, client_conn_2, storage_conn_1,
                 storage_conn_2]
-        self.app.em = Mock({"getConnectionList" : connection_list})
         # register a transaction
         tid = self.app.tm.begin(client_1, None)
         self.app.tm.prepare(tid, oid_list, uuid_list, msg_id)





More information about the Neo-report mailing list