[Neo-report] r2564 gregory - in /trunk/neo: master/ master/handlers/ tests/master/

nobody at svn.erp5.org nobody at svn.erp5.org
Wed Dec 22 18:10:15 CET 2010


Author: gregory
Date: Wed Dec 22 18:10:14 2010
New Revision: 2564

Log:
Serialize only the last part of the 2PC.

Modified:
    trunk/neo/master/app.py
    trunk/neo/master/handlers/client.py
    trunk/neo/master/transactions.py
    trunk/neo/tests/master/testClientHandler.py
    trunk/neo/tests/master/testStorageHandler.py
    trunk/neo/tests/master/testTransactions.py

Modified: trunk/neo/master/app.py
==============================================================================
--- trunk/neo/master/app.py [iso-8859-1] (original)
+++ trunk/neo/master/app.py [iso-8859-1] Wed Dec 22 18:10:14 2010
@@ -45,8 +45,6 @@ class Application(object):
     last_transaction = ZERO_TID
 
     def __init__(self, config):
-        self._queued_events = []
-
         # always use default connector for now
         self.connector_handler = getConnectorHandler()
 
@@ -584,9 +582,8 @@ class Application(object):
             getByUUID(storage_uuid).getConnection().notify(notify_unlock)
 
         # remove transaction from manager
-        self.tm.remove(tid)
+        self.tm.remove(transaction_node.getUUID(), tid)
         self.setLastTransaction(tid)
-        self.executeQueuedEvent()
 
     def getLastTransaction(self):
         return self.last_transaction
@@ -605,17 +602,3 @@ class Application(object):
     def isStorageReady(self, uuid):
         return uuid in self.storage_readiness
 
-    def queueEvent(self, func, conn, *args, **kw):
-        msg_id = conn.getPeerId()
-        self._queued_events.append((func, msg_id, conn, args, kw))
-
-    def executeQueuedEvent(self):
-        queue = self._queued_events
-        while queue:
-            func, msg_id, conn, args, kw = queue.pop(0)
-            if conn.isAborted() or conn.isClosed():
-                continue
-            conn.setPeerId(msg_id)
-            func(conn, *args, **kw)
-            break
-

Modified: trunk/neo/master/handlers/client.py
==============================================================================
--- trunk/neo/master/handlers/client.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/client.py [iso-8859-1] Wed Dec 22 18:10:14 2010
@@ -51,11 +51,8 @@ class ClientServiceHandler(MasterHandler
         """
             A client request a TID, nothing is kept about it until the finish.
         """
-        try:
-            conn.answer(Packets.AnswerBeginTransaction(self.app.tm.begin(
-                conn.getUUID(), tid)))
-        except DelayedError:
-            self.app.queueEvent(self.askBeginTransaction, conn, tid)
+        conn.answer(Packets.AnswerBeginTransaction(self.app.tm.begin(
+            conn.getUUID(), tid)))
 
     def askNewOIDs(self, conn, num_oids):
         app = self.app
@@ -88,13 +85,8 @@ class ClientServiceHandler(MasterHandler
         partitions = app.pt.getPartitions()
         peer_id = conn.getPeerId()
         node = app.nm.getByUUID(conn.getUUID())
-        try:
-            tid = app.tm.prepare(node, ttid, partitions, oid_list,
-                usable_uuid_set, peer_id)
-        except DelayedError:
-            app.queueEvent(self.askFinishTransaction, conn, ttid,
-                oid_list)
-            return
+        tid = app.tm.prepare(node, ttid, partitions, oid_list,
+            usable_uuid_set, peer_id)
 
         # check if greater and foreign OID was stored
         if app.tm.updateLastOID(oid_list):
@@ -124,7 +116,5 @@ class ClientServiceHandler(MasterHandler
             self.app.getLastTransaction()))
 
     def abortTransaction(self, conn, tid):
-        app = self.app
-        app.tm.remove(tid)
-        app.executeQueuedEvent()
+        self.app.tm.remove(conn.getUUID(), tid)
 

Modified: trunk/neo/master/transactions.py
==============================================================================
--- trunk/neo/master/transactions.py [iso-8859-1] (original)
+++ trunk/neo/master/transactions.py [iso-8859-1] Wed Dec 22 18:10:14 2010
@@ -187,10 +187,6 @@ class TransactionManager(object):
         Manage current transactions
     """
     _last_tid = ZERO_TID
-    # Transaction serialisation
-    # We don't need to use a real lock, as we are mono-threaded.
-    _locked = None
-
     _next_ttid = 0
 
     def __init__(self, on_commit):
@@ -200,6 +196,7 @@ class TransactionManager(object):
         self._node_dict = {}
         self._last_oid = None
         self._on_commit = on_commit
+        self._queue = []
 
     def __getitem__(self, tid):
         """
@@ -336,46 +333,36 @@ class TransactionManager(object):
             # No TID requested, generate a temporary one
             tid = self.getTTID()
         else:
-            # TID requested, take commit lock immediately
-            if self._locked is not None:
-                raise DelayedError()
-            self._locked = (uuid, tid)
+            self._queue.append((uuid, tid))
         return tid
 
     def prepare(self, node, ttid, divisor, oid_list, uuid_list, msg_id):
         """
             Prepare a transaction to be finished
         """
-        locked = self._locked
-        uuid = node.getUUID()
-        if locked is not None and locked[1] == ttid:
-            assert locked[0] == uuid
-            # Transaction requested some TID upon begin, and it owns the commit
-            # lock since then.
-            tid = ttid
+        # XXX: not efficient but the list should be often small
+        for _, tid in self._queue:
+            if ttid == tid:
+                break
         else:
-            # Otherwise, acquire lock and allocate a new TID.
-            if locked is not None:
-                raise DelayedError()
             tid = self._nextTID(ttid, divisor)
-            self._locked = (uuid, tid)
-
+            self._queue.append((node.getUUID(), tid))
         self.setLastTID(tid)
+        neo.logging.debug('Finish TXN %s for %s (was %s)', dump(tid), node, dump(ttid))
         txn = Transaction(node, ttid, tid, oid_list, uuid_list, msg_id)
         self._tid_dict[tid] = txn
         self._node_dict.setdefault(node, {})[tid] = txn
         return tid
 
-    def remove(self, tid):
+    def remove(self, uuid, tid):
         """
             Remove a transaction, commited or aborted
         """
-        locked = self._locked
-        if locked is not None and tid == locked[1]:
-            # If TID has the lock, release it.
-            # It might legitimately not have the lock (ex: a transaction
-            # aborting, which didn't request a TID upon begin)
-            self._locked = None
+        try:
+            self._queue.remove((uuid, tid))
+        except ValueError:
+            # finish might not have been started
+            pass
         tid_dict = self._tid_dict
         if tid in tid_dict:
             # ...and tried to finish
@@ -392,7 +379,7 @@ class TransactionManager(object):
         txn = self._tid_dict[tid]
         if txn.lock(uuid):
             # all storage are locked
-            self._on_commit(tid, txn)
+            self._unlockPending()
 
     def forget(self, uuid):
         """
@@ -401,22 +388,37 @@ class TransactionManager(object):
         """
         for tid, txn in self._tid_dict.items():
             if txn.forget(uuid):
+                self._unlockPending()
+
+    def _unlockPending(self):
+        # unlock pending transactions
+        while self._queue:
+            tid = self._queue[0][1]
+            # _queue can contain un-prepared transactions
+            txn = self._tid_dict.get(tid, None)
+            if txn is not None and txn.locked():
+                self._queue.pop()
                 self._on_commit(tid, txn)
+            else:
+                break
 
     def abortFor(self, node):
         """
             Abort pending transactions initiated by a node
         """
-        locked = self._locked
-        if locked is not None and locked[0] == node.getUUID():
-            self._locked = None
+        neo.logging.debug('Abort for %s', node)
         # nothing to do
         if node not in self._node_dict:
             return
         # remove transactions
+        uuid = node.getUUID()
         remove = self.remove
         for tid in self._node_dict[node].keys():
-            remove(tid)
+            remove(uuid, tid)
+        # the code below is usefull only during an import
+        for nuuid, ntid in list(self._queue):
+            if nuuid == uuid:
+                self._queue.remove((uuid, tid))
         # discard node entry
         del self._node_dict[node]
 

Modified: trunk/neo/tests/master/testClientHandler.py
==============================================================================
--- trunk/neo/tests/master/testClientHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testClientHandler.py [iso-8859-1] Wed Dec 22 18:10:14 2010
@@ -82,16 +82,6 @@ class MasterClientHandlerTests(NeoUnitTe
         # Client asks for a TID
         self.app.tm = tm_org
         service.askBeginTransaction(conn, tid1)
-        # If asking again for a TID, call is queued
-        call_marker = []
-        def queueEvent(*args, **kw):
-            call_marker.append((args, kw))
-        self.app.queueEvent = queueEvent
-        service.askBeginTransaction(conn, tid2)
-        self.assertEqual(len(call_marker), 1)
-        args, kw = call_marker[0]
-        self.assertEqual(kw, {})
-        self.assertEqual(args, (service.askBeginTransaction, conn, tid2))
 
     def test_08_askNewOIDs(self):
         service = self.service

Modified: trunk/neo/tests/master/testStorageHandler.py
==============================================================================
--- trunk/neo/tests/master/testStorageHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testStorageHandler.py [iso-8859-1] Wed Dec 22 18:10:14 2010
@@ -231,7 +231,7 @@ class MasterStorageHandlerTests(NeoUnitT
             [node1.getUUID(), node2.getUUID()], msg_id_2)
         # T2: pending locking answer, client keeps waiting
         self.checkNoPacketSent(cconn2, check_notify=False)
-        tm.remove(tid2)
+        tm.remove(node1.getUUID(), tid2)
 
         # Transaction 3: 1 storage node involved, which won't die
         msg_id_3 = 3
@@ -240,7 +240,7 @@ class MasterStorageHandlerTests(NeoUnitT
             [node2.getUUID(), ], msg_id_3)
         # T3: action not significant to this transacion, so no response
         self.checkNoPacketSent(cconn3, check_notify=False)
-        tm.remove(tid3)
+        tm.remove(node1.getUUID(), tid3)
 
     def test_answerPack(self):
         # Note: incomming status has no meaning here, so it's left to False.

Modified: trunk/neo/tests/master/testTransactions.py
==============================================================================
--- trunk/neo/tests/master/testTransactions.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testTransactions.py [iso-8859-1] Wed Dec 22 18:10:14 2010
@@ -84,7 +84,7 @@ class testTransactionManager(NeoUnitTest
         txnman.lock(tid, uuid2)
         self.assertEqual(len(callback.getNamedCalls('__call__')), 1)
         # transaction finished
-        txnman.remove(tid)
+        txnman.remove(client_uuid, tid)
         self.assertEqual(txnman.getPendingList(), [])
 
     def testAbortFor(self):
@@ -108,7 +108,7 @@ class testTransactionManager(NeoUnitTest
         self.assertEqual(txnman.getPendingList(), [])
         self.assertFalse(txnman.hasPending())
         # ...and the lock is available
-        txnman.begin(client_uuid, self.getNextTID())
+        txnman.begin(self.getNextTID())
 
     def test_getNextOIDList(self):
         txnman = TransactionManager(lambda tid, txn: None)
@@ -146,7 +146,7 @@ class testTransactionManager(NeoUnitTest
         # t1 is over
         self.assertTrue(t1.forget(storage_1_uuid))
         self.assertEqual(t1.getUUIDList(), [storage_2_uuid])
-        tm.remove(tid1)
+        tm.remove(client_uuid, tid1)
 
         # Transaction 2: 2 storage nodes involved, one will die
         msg_id_2 = 2
@@ -160,7 +160,7 @@ class testTransactionManager(NeoUnitTest
         self.assertFalse(t2.forget(storage_1_uuid))
         self.assertEqual(t2.getUUIDList(), [storage_2_uuid])
         self.assertTrue(t2.lock(storage_2_uuid))
-        tm.remove(tid2)
+        tm.remove(client_uuid, tid2)
 
         # Transaction 3: 1 storage node involved, which won't die
         msg_id_3 = 3
@@ -174,7 +174,7 @@ class testTransactionManager(NeoUnitTest
         self.assertFalse(t3.forget(storage_1_uuid))
         self.assertEqual(t3.getUUIDList(), [storage_2_uuid])
         self.assertTrue(t3.lock(storage_2_uuid))
-        tm.remove(tid3)
+        tm.remove(client_uuid, tid3)
 
     def testTIDUtils(self):
         """
@@ -215,16 +215,13 @@ class testTransactionManager(NeoUnitTest
         ttid2 = self.getNextTID()
         tid1 = tm.begin(client_uuid, ttid1)
         self.assertEqual(tid1, ttid1)
-        self.assertRaises(DelayedError, tm.begin, client_uuid, ttid2)
-        tm.remove(tid1)
-        tm.remove(tm.begin(client_uuid, ttid2))
+        tm.remove(client_uuid, tid1)
         # Without a requested TID, lock spans from prepare to remove only
         ttid3 = tm.begin(client_uuid)
         ttid4 = tm.begin(client_uuid) # Doesn't raise
         node = Mock({'getUUID': client_uuid, '__hash__': 0})
         tid4 = tm.prepare(node, ttid4, 1, [], [], 0)
-        self.assertRaises(DelayedError, tm.prepare, node, ttid3, 1, [], [], 0)
-        tm.remove(tid4)
+        tm.remove(client_uuid, tid4)
         tm.prepare(node, ttid3, 1, [], [], 0)
 
     def testClientDisconectsAfterBegin(self):
@@ -233,11 +230,10 @@ class testTransactionManager(NeoUnitTest
         tm = TransactionManager(lambda tid, txn: None)
         tid1 = self.getNextTID()
         tid2 = self.getNextTID()
-        tm.begin(client1_uuid, tid1)
-        self.assertRaises(DelayedError, tm.begin, client2_uuid, tid2)
+        tm.begin(tid1)
         node1 = Mock({'getUUID': client1_uuid, '__hash__': 0})
         tm.abortFor(node1)
-        tm.begin(client2_uuid, tid2)
+        tm.begin(tid2)
 
 if __name__ == '__main__':
     unittest.main()




More information about the Neo-report mailing list