[Neo-report] r2613 gregory - in /trunk/neo: ./ master/ master/handlers/ storage/handlers/ ...

nobody at svn.erp5.org nobody at svn.erp5.org
Tue Jan 11 16:49:16 CET 2011


Author: gregory
Date: Tue Jan 11 16:49:16 2011
New Revision: 2613

Log:
Master transaction manager use TTID as index.

- AnswerInformationLocked give ttid instead of tid
- Master transaction manager always use ttid in data structures
- It's no more makes sense to check if the tid is greater than the last
generated as it never comes back from a storage, just check if the ttid is
well known by the transaction manager.
- Rename all tid variable that now hold a ttid
- Transaction manager's queue contains ttids but the corresponding tids are
increasing to keep commit order.
- Adjust tests

Modified:
    trunk/neo/handler.py
    trunk/neo/master/app.py
    trunk/neo/master/handlers/storage.py
    trunk/neo/master/transactions.py
    trunk/neo/storage/handlers/master.py
    trunk/neo/tests/master/testClientHandler.py
    trunk/neo/tests/master/testStorageHandler.py
    trunk/neo/tests/master/testTransactions.py

Modified: trunk/neo/handler.py
==============================================================================
--- trunk/neo/handler.py [iso-8859-1] (original)
+++ trunk/neo/handler.py [iso-8859-1] Tue Jan 11 16:49:16 2011
@@ -220,7 +220,7 @@ class EventHandler(object):
     def askLockInformation(self, conn, ttid, tid, oid_list):
         raise UnexpectedPacketError
 
-    def answerInformationLocked(self, conn, tid):
+    def answerInformationLocked(self, conn, ttid):
         raise UnexpectedPacketError
 
     def invalidateObjects(self, conn, tid, oid_list):

Modified: trunk/neo/master/app.py
==============================================================================
--- trunk/neo/master/app.py [iso-8859-1] (original)
+++ trunk/neo/master/app.py [iso-8859-1] Tue Jan 11 16:49:16 2011
@@ -560,11 +560,12 @@ class Application(object):
             neo.logging.info('Accept a storage %s (%s)' % (dump(uuid), state))
         return (uuid, node, state, handler, node_ctor)
 
-    def onTransactionCommitted(self, tid, txn):
+    def onTransactionCommitted(self, txn):
         # I have received all the lock answers now:
         # - send a Notify Transaction Finished to the initiated client node
         # - Invalidate Objects to the other client nodes
         ttid = txn.getTTID()
+        tid = txn.getTID()
         transaction_node = txn.getNode()
         invalidate_objects = Packets.InvalidateObjects(tid, txn.getOIDList())
         transaction_finished = Packets.AnswerTransactionFinished(ttid, tid)
@@ -582,7 +583,7 @@ class Application(object):
             getByUUID(storage_uuid).getConnection().notify(notify_unlock)
 
         # remove transaction from manager
-        self.tm.remove(transaction_node.getUUID(), tid)
+        self.tm.remove(transaction_node.getUUID(), ttid)
         self.setLastTransaction(tid)
 
     def getLastTransaction(self):

Modified: trunk/neo/master/handlers/storage.py
==============================================================================
--- trunk/neo/master/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/storage.py [iso-8859-1] Tue Jan 11 16:49:16 2011
@@ -62,16 +62,12 @@ class StorageServiceHandler(BaseServiceH
         p = Packets.AnswerUnfinishedTransactions(self.app.tm.getPendingList())
         conn.answer(p)
 
-    def answerInformationLocked(self, conn, tid):
+    def answerInformationLocked(self, conn, ttid):
         tm = self.app.tm
-
-        # If the given transaction ID is later than the last TID, the peer
-        # is crazy.
-        if tid > tm.getLastTID():
-            raise ProtocolError('TID too big')
-
+        if ttid not in tm:
+            raise ProtocolError('Unknown transaction')
         # transaction locked on this storage node
-        tm.lock(tid, conn.getUUID())
+        self.app.tm.lock(ttid, conn.getUUID())
 
     def notifyReplicationDone(self, conn, offset):
         node = self.app.nm.getByUUID(conn.getUUID())

Modified: trunk/neo/master/transactions.py
==============================================================================
--- trunk/neo/master/transactions.py [iso-8859-1] (original)
+++ trunk/neo/master/transactions.py [iso-8859-1] Tue Jan 11 16:49:16 2011
@@ -190,26 +190,27 @@ class TransactionManager(object):
     _next_ttid = 0
 
     def __init__(self, on_commit):
-        # tid -> transaction
-        self._tid_dict = {}
+        # ttid -> transaction
+        self._ttid_dict = {}
         # node -> transactions mapping
         self._node_dict = {}
         self._last_oid = None
         self._on_commit = on_commit
+        # queue filled with ttids pointing to transactions with increasing tids
         self._queue = []
 
-    def __getitem__(self, tid):
+    def __getitem__(self, ttid):
         """
             Return the transaction object for this TID
         """
         # XXX: used by unit tests only
-        return self._tid_dict[tid]
+        return self._ttid_dict[ttid]
 
-    def __contains__(self, tid):
+    def __contains__(self, ttid):
         """
             Returns True if this is a pending transaction
         """
-        return tid in self._tid_dict
+        return ttid in self._ttid_dict
 
     def getNextOIDList(self, num_oids):
         """ Generate a new OID list """
@@ -306,20 +307,20 @@ class TransactionManager(object):
             Discard all manager content
             This doesn't reset the last TID.
         """
-        self._tid_dict = {}
+        self._ttid_dict = {}
         self._node_dict = {}
 
     def hasPending(self):
         """
             Returns True if some transactions are pending
         """
-        return bool(self._tid_dict)
+        return bool(self._ttid_dict)
 
     def getPendingList(self):
         """
             Return the list of pending transaction IDs
         """
-        return self._tid_dict.keys()
+        return [txn.getTID() for txn in self._ttid_dict.values()]
 
     def begin(self, uuid, tid=None):
         """
@@ -345,38 +346,41 @@ class TransactionManager(object):
                 break
         else:
             tid = self._nextTID(ttid, divisor)
-            self._queue.append((node.getUUID(), tid))
+            self._queue.append((node.getUUID(), ttid))
         neo.logging.debug('Finish TXN %s for %s (was %s)', dump(tid), node, dump(ttid))
         txn = Transaction(node, ttid, tid, oid_list, uuid_list, msg_id)
-        self._tid_dict[tid] = txn
-        self._node_dict.setdefault(node, {})[tid] = txn
+        self._ttid_dict[ttid] = txn
+        self._node_dict.setdefault(node, {})[ttid] = txn
         return tid
 
-    def remove(self, uuid, tid):
+    def remove(self, uuid, ttid):
         """
             Remove a transaction, commited or aborted
         """
         try:
-            self._queue.remove((uuid, tid))
+            # only in case of an import:
+            self._queue.remove((uuid, ttid))
         except ValueError:
             # finish might not have been started
             pass
-        tid_dict = self._tid_dict
-        if tid in tid_dict:
+        ttid_dict = self._ttid_dict
+        if ttid in ttid_dict:
+            txn = ttid_dict[ttid]
+            tid = txn.getTID()
+            node = txn.getNode()
             # ...and tried to finish
-            node = tid_dict[tid].getNode()
-            del tid_dict[tid]
-            del self._node_dict[node][tid]
+            del ttid_dict[ttid]
+            del self._node_dict[node][ttid]
 
-    def lock(self, tid, uuid):
+    def lock(self, ttid, uuid):
         """
             Set that a node has locked the transaction.
             If transaction is completely locked, calls function given at
             instanciation time.
         """
-        assert tid in self._tid_dict, "Transaction not started"
-        txn = self._tid_dict[tid]
-        if txn.lock(uuid) and self._queue[0][1] == tid:
+        assert ttid in self._ttid_dict, "Transaction not started"
+        txn = self._ttid_dict[ttid]
+        if txn.lock(uuid) and self._queue[0][1] == ttid:
             # all storage are locked and we unlock the commit queue
             self._unlockPending()
 
@@ -387,8 +391,8 @@ class TransactionManager(object):
         """
         unlock = False
         # iterate over a copy because _unlockPending may alter the dict
-        for tid, txn in self._tid_dict.items():
-            if txn.forget(uuid) and self._queue[0][1] == tid:
+        for ttid, txn in self._ttid_dict.items():
+            if txn.forget(uuid) and self._queue[0][1] == ttid:
                 unlock = True
         if unlock:
             self._unlockPending()
@@ -399,15 +403,15 @@ class TransactionManager(object):
         pop = queue.pop
         insert = queue.insert
         on_commit = self._on_commit
-        get = self._tid_dict.get
+        get = self._ttid_dict.get
         while queue:
-            uuid, tid = pop(0)
-            txn = get(tid, None)
+            uuid, ttid = pop(0)
+            txn = get(ttid, None)
             # _queue can contain un-prepared transactions
             if txn is not None and txn.locked():
-                on_commit(tid, txn)
+                on_commit(txn)
             else:
-                insert(0, (uuid, tid))
+                insert(0, (uuid, ttid))
                 break
 
     def abortFor(self, node):
@@ -423,13 +427,13 @@ class TransactionManager(object):
         if node in self._node_dict:
             # remove transactions
             remove = self.remove
-            for tid in self._node_dict[node].keys():
-                remove(uuid, tid)
+            for ttid in self._node_dict[node].keys():
+                remove(uuid, ttid)
             # discard node entry
             del self._node_dict[node]
 
     def log(self):
         neo.logging.info('Transactions:')
-        for txn in self._tid_dict.itervalues():
+        for txn in self._ttid_dict.itervalues():
             neo.logging.info('  %r', txn)
 

Modified: trunk/neo/storage/handlers/master.py
==============================================================================
--- trunk/neo/storage/handlers/master.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/master.py [iso-8859-1] Tue Jan 11 16:49:16 2011
@@ -58,7 +58,7 @@ class MasterOperationHandler(BaseMasterH
             raise ProtocolError('Unknown transaction')
         self.app.tm.lock(ttid, tid, oid_list)
         if not conn.isClosed():
-            conn.answer(Packets.AnswerInformationLocked(tid))
+            conn.answer(Packets.AnswerInformationLocked(ttid))
 
     def notifyUnlockInformation(self, conn, ttid):
         if not ttid in self.app.tm:

Modified: trunk/neo/tests/master/testClientHandler.py
==============================================================================
--- trunk/neo/tests/master/testClientHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testClientHandler.py [iso-8859-1] Tue Jan 11 16:49:16 2011
@@ -127,24 +127,24 @@ class MasterClientHandlerTests(NeoUnitTe
             ],
             'getPartitions': 2,
         })
-        service.askBeginTransaction(conn, None)
+        ttid = self.getNextTID()
+        service.askBeginTransaction(conn, ttid)
         oid_list = []
-        tid = self.app.tm.getLastTID()
         conn = self.getFakeConnection(client_uuid, self.client_address)
         self.app.nm.getByUUID(storage_uuid).setConnection(storage_conn)
         # No packet sent if storage node is not ready
         self.assertFalse(self.app.isStorageReady(storage_uuid))
-        service.askFinishTransaction(conn, tid, oid_list)
+        service.askFinishTransaction(conn, ttid, oid_list)
         self.checkNoPacketSent(storage_conn)
         self.app.tm.abortFor(self.app.nm.getByUUID(client_uuid))
         # ...but AskLockInformation is sent if it is ready
         self.app.setStorageReady(storage_uuid)
         self.assertTrue(self.app.isStorageReady(storage_uuid))
-        service.askFinishTransaction(conn, tid, oid_list)
+        service.askFinishTransaction(conn, ttid, oid_list)
         self.checkAskLockInformation(storage_conn)
         self.assertEquals(len(self.app.tm.getPendingList()), 1)
-        apptid = self.app.tm.getPendingList()[0]
-        txn = self.app.tm[apptid]
+        txn = self.app.tm[ttid]
+        self.assertEquals(txn.getTID(), self.app.tm.getPendingList()[0])
         self.assertEquals(len(txn.getOIDList()), 0)
         self.assertEquals(len(txn.getUUIDList()), 1)
 

Modified: trunk/neo/tests/master/testStorageHandler.py
==============================================================================
--- trunk/neo/tests/master/testStorageHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testStorageHandler.py [iso-8859-1] Tue Jan 11 16:49:16 2011
@@ -104,15 +104,15 @@ class MasterStorageHandlerTests(NeoUnitT
         ttid = self.app.tm.begin(client_1.getUUID())
         tid = self.app.tm.prepare(client_1, ttid, 1, oid_list, uuid_list,
             msg_id)
-        self.assertTrue(tid in self.app.tm)
+        self.assertTrue(ttid in self.app.tm)
         # the first storage acknowledge the lock
-        self.service.answerInformationLocked(storage_conn_1, tid)
+        self.service.answerInformationLocked(storage_conn_1, ttid)
         self.checkNoPacketSent(client_conn_1)
         self.checkNoPacketSent(client_conn_2)
         self.checkNoPacketSent(storage_conn_1)
         self.checkNoPacketSent(storage_conn_2)
         # then the second
-        self.service.answerInformationLocked(storage_conn_2, tid)
+        self.service.answerInformationLocked(storage_conn_2, ttid)
         self.checkAnswerTransactionFinished(client_conn_1)
         self.checkInvalidateObjects(client_conn_2)
         self.checkNotifyUnlockInformation(storage_conn_1)
@@ -211,7 +211,7 @@ class MasterStorageHandlerTests(NeoUnitT
         ttid1 = tm.begin(node1.getUUID())
         tid1 = tm.prepare(client1, ttid1, 1, oid_list,
             [node1.getUUID(), node2.getUUID()], msg_id_1)
-        tm.lock(tid1, node2.getUUID())
+        tm.lock(ttid1, node2.getUUID())
         self.checkNoPacketSent(cconn1)
         # Storage 1 dies
         node1.setTemporarilyDown()
@@ -231,7 +231,7 @@ class MasterStorageHandlerTests(NeoUnitT
             [node1.getUUID(), node2.getUUID()], msg_id_2)
         # T2: pending locking answer, client keeps waiting
         self.checkNoPacketSent(cconn2, check_notify=False)
-        tm.remove(node1.getUUID(), tid2)
+        tm.remove(node1.getUUID(), ttid2)
 
         # Transaction 3: 1 storage node involved, which won't die
         msg_id_3 = 3
@@ -240,7 +240,7 @@ class MasterStorageHandlerTests(NeoUnitT
             [node2.getUUID(), ], msg_id_3)
         # T3: action not significant to this transacion, so no response
         self.checkNoPacketSent(cconn3, check_notify=False)
-        tm.remove(node1.getUUID(), tid3)
+        tm.remove(node1.getUUID(), ttid3)
 
     def test_answerPack(self):
         # Note: incomming status has no meaning here, so it's left to False.

Modified: trunk/neo/tests/master/testTransactions.py
==============================================================================
--- trunk/neo/tests/master/testTransactions.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testTransactions.py [iso-8859-1] Tue Jan 11 16:49:16 2011
@@ -79,17 +79,17 @@ class testTransactionManager(NeoUnitTest
         tid = txnman.prepare(node, ttid, 1, oid_list, uuid_list, msg_id)
         self.assertTrue(txnman.hasPending())
         self.assertEqual(txnman.getPendingList()[0], tid)
-        self.assertEqual(txnman[tid].getTID(), tid)
-        txn = txnman[tid]
+        txn = txnman[ttid]
+        self.assertEqual(txn.getTID(), tid)
         self.assertEqual(txn.getUUIDList(), list(uuid_list))
         self.assertEqual(txn.getOIDList(), list(oid_list))
         # lock nodes
-        txnman.lock(tid, uuid1)
+        txnman.lock(ttid, uuid1)
         self.assertEqual(len(callback.getNamedCalls('__call__')), 0)
-        txnman.lock(tid, uuid2)
+        txnman.lock(ttid, uuid2)
         self.assertEqual(len(callback.getNamedCalls('__call__')), 1)
         # transaction finished
-        txnman.remove(client_uuid, tid)
+        txnman.remove(client_uuid, ttid)
         self.assertEqual(txnman.getPendingList(), [])
 
     def testAbortFor(self):
@@ -144,8 +144,8 @@ class testTransactionManager(NeoUnitTest
         ttid1 = tm.begin(client_uuid)
         tid1 = tm.prepare(client1, ttid1, 1, oid_list,
             [storage_1_uuid, storage_2_uuid], msg_id_1)
-        tm.lock(tid1, storage_2_uuid)
-        t1 = tm[tid1]
+        tm.lock(ttid1, storage_2_uuid)
+        t1 = tm[ttid1]
         self.assertFalse(t1.locked())
         # Storage 1 dies:
         # t1 is over
@@ -158,7 +158,7 @@ class testTransactionManager(NeoUnitTest
         ttid2 = tm.begin(client_uuid)
         tid2 = tm.prepare(client2, ttid2, 1, oid_list,
             [storage_1_uuid, storage_2_uuid], msg_id_2)
-        t2 = tm[tid2]
+        t2 = tm[ttid2]
         self.assertFalse(t2.locked())
         # Storage 1 dies:
         # t2 still waits for storage 2
@@ -172,7 +172,7 @@ class testTransactionManager(NeoUnitTest
         ttid3 = tm.begin(client_uuid)
         tid3 = tm.prepare(client3, ttid3, 1, oid_list, [storage_2_uuid, ],
             msg_id_3)
-        t3 = tm[tid3]
+        t3 = tm[ttid3]
         self.assertFalse(t3.locked())
         # Storage 1 dies:
         # t3 doesn't care
@@ -249,10 +249,10 @@ class testTransactionManager(NeoUnitTest
         ttid2 = tm.begin(uuid2)
         tid1 = tm.prepare(node1, ttid1, 1, [], [storage_uuid], 0)
         tid2 = tm.prepare(node2, ttid2, 1, [], [storage_uuid], 0)
-        tm.lock(tid2, storage_uuid)
+        tm.lock(ttid2, storage_uuid)
         # txn 2 is still blocked by txn 1
         self.assertEqual(len(callback.getNamedCalls('__call__')), 0)
-        tm.lock(tid1, storage_uuid)
+        tm.lock(ttid1, storage_uuid)
         # both transactions are unlocked when txn 1 is fully locked
         self.assertEqual(len(callback.getNamedCalls('__call__')), 2)
 




More information about the Neo-report mailing list