[Neo-report] r2530 vincent - in /trunk/neo: ./ client/ master/ master/handlers/ tests/ tes...

nobody at svn.erp5.org nobody at svn.erp5.org
Tue Dec 14 16:56:46 CET 2010


Author: vincent
Date: Tue Dec 14 16:56:46 2010
New Revision: 2530

Log:
Serialise the whole 2PC at cluster level.

This ensures invalidations are sent in strict ascending TID values order.

Modified:
    trunk/neo/client/app.py
    trunk/neo/handler.py
    trunk/neo/master/app.py
    trunk/neo/master/handlers/client.py
    trunk/neo/master/handlers/storage.py
    trunk/neo/master/transactions.py
    trunk/neo/protocol.py
    trunk/neo/tests/master/testClientHandler.py
    trunk/neo/tests/master/testStorageHandler.py
    trunk/neo/tests/master/testTransactions.py
    trunk/neo/tests/testProtocol.py

Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Tue Dec 14 16:56:46 2010
@@ -601,11 +601,10 @@ class Application(object):
         if self.local_var.txn is not None:
             raise NeoException, 'local_var is not clean in tpc_begin'
         # use the given TID or request a new one to the master
-        self.local_var.tid = tid
-        if tid is None:
-            self._askPrimary(Packets.AskBeginTransaction())
-            if self.local_var.tid is None:
-                raise NEOStorageError('tpc_begin failed')
+        self._askPrimary(Packets.AskBeginTransaction(tid))
+        if self.local_var.tid is None:
+            raise NEOStorageError('tpc_begin failed')
+        assert tid in (None, self.local_var.tid), (tid, self.local_var.tid)
         self.local_var.txn = transaction
 
     @profiler_decorator
@@ -837,6 +836,7 @@ class Application(object):
             except:
                 neo.logging.error('Exception in tpc_abort while notifying ' \
                     'storage node %r of abortion, ignoring.', conn, exc_info=1)
+        self._getMasterConnection().notify(p)
 
         # Just wait for responses to arrive. If any leads to an exception,
         # log it and continue: we *must* eat all answers to not disturb the

Modified: trunk/neo/handler.py
==============================================================================
--- trunk/neo/handler.py [iso-8859-1] (original)
+++ trunk/neo/handler.py [iso-8859-1] Tue Dec 14 16:56:46 2010
@@ -196,7 +196,7 @@ class EventHandler(object):
     def commitTransaction(self, conn, tid):
         raise UnexpectedPacketError
 
-    def askBeginTransaction(self, conn):
+    def askBeginTransaction(self, conn, tid):
         raise UnexpectedPacketError
 
     def answerBeginTransaction(self, conn, tid):

Modified: trunk/neo/master/app.py
==============================================================================
--- trunk/neo/master/app.py [iso-8859-1] (original)
+++ trunk/neo/master/app.py [iso-8859-1] Tue Dec 14 16:56:46 2010
@@ -45,6 +45,7 @@ class Application(object):
     last_transaction = ZERO_TID
 
     def __init__(self, config):
+        self._queued_events = []
 
         # always use default connector for now
         self.connector_handler = getConnectorHandler()
@@ -578,3 +579,17 @@ class Application(object):
     def isStorageReady(self, uuid):
         return uuid in self.storage_readiness
 
+    def queueEvent(self, func, conn, *args, **kw):
+        msg_id = conn.getPeerId()
+        self._queued_events.append((func, msg_id, conn, args, kw))
+
+    def executeQueuedEvent(self):
+        queue = self._queued_events
+        while queue:
+            func, msg_id, conn, args, kw = queue.pop(0)
+            if conn.isAborted() or conn.isClosed():
+                continue
+            conn.setPeerId(msg_id)
+            func(conn, *args, **kw)
+            break
+

Modified: trunk/neo/master/handlers/client.py
==============================================================================
--- trunk/neo/master/handlers/client.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/client.py [iso-8859-1] Tue Dec 14 16:56:46 2010
@@ -20,7 +20,7 @@ import neo
 from neo.protocol import NodeStates, Packets, ProtocolError
 from neo.master.handlers import MasterHandler
 from neo.util import dump
-
+from neo.master.transactions import DelayedError
 
 class ClientServiceHandler(MasterHandler):
     """ Handler dedicated to client during service state """
@@ -47,11 +47,14 @@ class ClientServiceHandler(MasterHandler
         conn.notify(Packets.NotifyNodeInformation(node_list))
         conn.answer(Packets.AnswerNodeInformation())
 
-    def askBeginTransaction(self, conn):
+    def askBeginTransaction(self, conn, tid):
         """
             A client request a TID, nothing is kept about it until the finish.
         """
-        conn.answer(Packets.AnswerBeginTransaction(self.app.tm.begin()))
+        try:
+            conn.answer(Packets.AnswerBeginTransaction(self.app.tm.begin(tid)))
+        except DelayedError:
+            self.app.queueEvent(self.askBeginTransaction, conn, tid)
 
     def askNewOIDs(self, conn, num_oids):
         app = self.app
@@ -108,3 +111,8 @@ class ClientServiceHandler(MasterHandler
         conn.answer(Packets.AnswerLastTransaction(
             self.app.getLastTransaction()))
 
+    def abortTransaction(self, conn, tid):
+        app = self.app
+        app.tm.remove(tid)
+        app.executeQueuedEvent()
+

Modified: trunk/neo/master/handlers/storage.py
==============================================================================
--- trunk/neo/master/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/storage.py [iso-8859-1] Tue Dec 14 16:56:46 2010
@@ -107,6 +107,7 @@ class StorageServiceHandler(BaseServiceH
         # remove transaction from manager
         tm.remove(tid)
         app.setLastTransaction(tid)
+        app.executeQueuedEvent()
 
     def notifyReplicationDone(self, conn, offset):
         node = self.app.nm.getByUUID(conn.getUUID())

Modified: trunk/neo/master/transactions.py
==============================================================================
--- trunk/neo/master/transactions.py [iso-8859-1] (original)
+++ trunk/neo/master/transactions.py [iso-8859-1] Tue Dec 14 16:56:46 2010
@@ -84,6 +84,9 @@ def addTID(ptid, offset):
         higher = (d.year, d.month, d.day, d.hour, d.minute)
     return packTID((higher, lower))
 
+class DelayedError(Exception):
+    pass
+
 class Transaction(object):
     """
         A pending transaction
@@ -177,6 +180,9 @@ class TransactionManager(object):
         Manage current transactions
     """
     _last_tid = ZERO_TID
+    # Transaction serialisation
+    # We don't need to use a real lock, as we are mono-threaded.
+    _locked = None
 
     def __init__(self):
         # tid -> transaction
@@ -272,11 +278,16 @@ class TransactionManager(object):
         """
         return self._tid_dict.keys()
 
-    def begin(self):
+    def begin(self, tid=None):
         """
             Generate a new TID
         """
-        return self._nextTID()
+        if self._locked is not None:
+            raise DelayedError()
+        if tid is None:
+            tid = self._nextTID()
+        self._locked = tid
+        return tid
 
     def prepare(self, node, tid, oid_list, uuid_list, msg_id):
         """
@@ -291,9 +302,14 @@ class TransactionManager(object):
         """
             Remove a transaction, commited or aborted
         """
-        node = self._tid_dict[tid].getNode()
-        del self._tid_dict[tid]
-        del self._node_dict[node][tid]
+        assert self._locked == tid, (self._locked, tid)
+        self._locked = None
+        tid_dict = self._tid_dict
+        if tid in tid_dict:
+            # ...and tried to finish
+            node = tid_dict[tid].getNode()
+            del tid_dict[tid]
+            del self._node_dict[node][tid]
 
     def lock(self, tid, uuid):
         """

Modified: trunk/neo/protocol.py
==============================================================================
--- trunk/neo/protocol.py [iso-8859-1] (original)
+++ trunk/neo/protocol.py [iso-8859-1] Tue Dec 14 16:56:46 2010
@@ -742,6 +742,11 @@ class AskBeginTransaction(Packet):
     """
     Ask to begin a new transaction. C -> PM.
     """
+    def _encode(self, tid):
+        return _encodeTID(tid)
+
+    def _decode(self, body):
+        return (_decodeTID(unpack('8s', body)[0]), )
 
 class AnswerBeginTransaction(Packet):
     """

Modified: trunk/neo/tests/master/testClientHandler.py
==============================================================================
--- trunk/neo/tests/master/testClientHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testClientHandler.py [iso-8859-1] Tue Dec 14 16:56:46 2010
@@ -70,7 +70,7 @@ class MasterClientHandlerTests(NeoUnitTe
         # client call it
         client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, port=self.client_port)
         conn = self.getFakeConnection(client_uuid, self.client_address)
-        service.askBeginTransaction(conn)
+        service.askBeginTransaction(conn, None)
         self.assertTrue(ltid < self.app.tm.getLastTID())
 
     def test_08_askNewOIDs(self):
@@ -102,7 +102,7 @@ class MasterClientHandlerTests(NeoUnitTe
             'getPartition': 0,
             'getCellList': [Mock({'getUUID': storage_uuid})],
         })
-        service.askBeginTransaction(conn)
+        service.askBeginTransaction(conn, None)
         oid_list = []
         tid = self.app.tm.getLastTID()
         conn = self.getFakeConnection(client_uuid, self.client_address)

Modified: trunk/neo/tests/master/testStorageHandler.py
==============================================================================
--- trunk/neo/tests/master/testStorageHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testStorageHandler.py [iso-8859-1] Tue Dec 14 16:56:46 2010
@@ -201,9 +201,6 @@ class MasterStorageHandlerTests(NeoUnitT
         client1, cconn1 = self._getClient()
         client2, cconn2 = self._getClient()
         client3, cconn3 = self._getClient()
-        tid1 = self.getNextTID()
-        tid2 = self.getNextTID(tid1)
-        tid3 = self.getNextTID(tid2)
         oid_list = [self.getOID(), ]
 
         # Some shortcuts to simplify test code
@@ -215,25 +212,13 @@ class MasterStorageHandlerTests(NeoUnitT
         # Transaction 1: 2 storage nodes involved, one will die and the other
         # already answered node lock
         msg_id_1 = 1
-        tm.prepare(client1, tid1, oid_list, [node1.getUUID(), node2.getUUID()], msg_id_1)
+        tid1 = tm.begin()
+        tm.prepare(client1, tid1, oid_list,
+            [node1.getUUID(), node2.getUUID()], msg_id_1)
         tm.lock(tid1, node2.getUUID())
-        # Transaction 2: 2 storage nodes involved, one will die
-        msg_id_2 = 2
-        tm.prepare(client2, tid2, oid_list, [node1.getUUID(), node2.getUUID()], msg_id_2)
-        # Transaction 3: 1 storage node involved, which won't die
-        msg_id_3 = 3
-        tm.prepare(client3, tid3, oid_list, [node2.getUUID(), ], msg_id_3)
-
-        # Assert initial state
-        self.checkNoPacketSent(cconn1)
-        self.checkNoPacketSent(cconn2)
-        self.checkNoPacketSent(cconn3)
-
         # Storage 1 dies
         node1.setTemporarilyDown()
         self.service.nodeLost(conn1, node1)
-
-        # Check state after node lost
         # T1: last locking node lost, client receives AnswerTransactionFinished
         self.checkAnswerTransactionFinished(cconn1)
         self.checkNotifyUnlockInformation(conn2)
@@ -241,10 +226,24 @@ class MasterStorageHandlerTests(NeoUnitT
         # ...and notifications are sent to other clients
         self.checkInvalidateObjects(cconn2)
         self.checkInvalidateObjects(cconn3)
+
+        # Transaction 2: 2 storage nodes involved, one will die
+        msg_id_2 = 2
+        tid2 = tm.begin()
+        tm.prepare(client2, tid2, oid_list,
+            [node1.getUUID(), node2.getUUID()], msg_id_2)
         # T2: pending locking answer, client keeps waiting
         self.checkNoPacketSent(cconn2, check_notify=False)
+        tm.remove(tid2)
+
+        # Transaction 3: 1 storage node involved, which won't die
+        msg_id_3 = 3
+        tid3 = tm.begin()
+        tm.prepare(client3, tid3, oid_list,
+            [node2.getUUID(), ], msg_id_3)
         # T3: action not significant to this transacion, so no response
         self.checkNoPacketSent(cconn3, check_notify=False)
+        tm.remove(tid3)
 
     def test_answerPack(self):
         # Note: incomming status has no meaning here, so it's left to False.

Modified: trunk/neo/tests/master/testTransactions.py
==============================================================================
--- trunk/neo/tests/master/testTransactions.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testTransactions.py [iso-8859-1] Tue Dec 14 16:56:46 2010
@@ -22,7 +22,7 @@ from neo.tests import NeoUnitTestBase
 from neo.protocol import ZERO_TID
 
 from neo.master.transactions import Transaction, TransactionManager
-from neo.master.transactions import packTID, unpackTID, addTID
+from neo.master.transactions import packTID, unpackTID, addTID, DelayedError
 
 class testTransactionManager(NeoUnitTestBase):
 
@@ -90,26 +90,19 @@ class testTransactionManager(NeoUnitTest
         storage_2_uuid = self.makeUUID(2)
         txnman = TransactionManager()
         # register 4 transactions made by two nodes
-        tid11 = txnman.begin()
-        txnman.prepare(node1, tid11, oid_list, [storage_1_uuid], 1)
-        tid12 = txnman.begin()
-        txnman.prepare(node1, tid12, oid_list, [storage_1_uuid], 2)
-        tid21 = txnman.begin()
-        txnman.prepare(node2, tid21, oid_list, [storage_2_uuid], 3)
-        tid22 = txnman.begin()
-        txnman.prepare(node2, tid22, oid_list, [storage_2_uuid], 4)
-        self.assertTrue(tid11 < tid12 < tid21 < tid22)
-        self.assertEqual(len(txnman.getPendingList()), 4)
-        # abort transactions of one node
-        txnman.abortFor(node1)
-        tid_list = txnman.getPendingList()
-        self.assertEqual(len(tid_list), 2)
-        self.assertTrue(tid21 in tid_list)
-        self.assertTrue(tid22 in tid_list)
-        # then the other
+        self.assertEqual(txnman.getPendingList(), [])
+        tid1 = txnman.begin()
+        txnman.prepare(node1, tid1, oid_list, [storage_1_uuid], 1)
+        self.assertEqual(txnman.getPendingList(), [tid1])
+        # abort transactions of another node, transaction stays
         txnman.abortFor(node2)
+        self.assertEqual(txnman.getPendingList(), [tid1])
+        # abort transactions of requesting node, transaction is removed
+        txnman.abortFor(node1)
         self.assertEqual(txnman.getPendingList(), [])
         self.assertFalse(txnman.hasPending())
+        # ...and we can start another transaction
+        tid2 = txnman.begin()
 
     def test_getNextOIDList(self):
         txnman = TransactionManager()
@@ -138,6 +131,9 @@ class testTransactionManager(NeoUnitTest
         txnman.setLastTID(ntid)
         self.assertEqual(txnman.getLastTID(), ntid)
         self.assertTrue(ntid > tid1)
+        # If a new TID is generated, DelayedError is raised
+        self.assertRaises(DelayedError, txnman.begin)
+        txnman.remove(tid1)
         # new trancation
         node2 = Mock({'__hash__': 2})
         tid2 = txnman.begin()
@@ -159,36 +155,39 @@ class testTransactionManager(NeoUnitTest
         tid1 = tm.begin()
         tm.prepare(client1, tid1, oid_list, [storage_1_uuid, storage_2_uuid], msg_id_1)
         tm.lock(tid1, storage_2_uuid)
+        t1 = tm[tid1]
+        self.assertFalse(t1.locked())
+        # Storage 1 dies:
+        # t1 is over
+        self.assertTrue(t1.forget(storage_1_uuid))
+        self.assertEqual(t1.getUUIDList(), [storage_2_uuid])
+        tm.remove(tid1)
+
         # Transaction 2: 2 storage nodes involved, one will die
         msg_id_2 = 2
         tid2 = tm.begin()
         tm.prepare(client2, tid2, oid_list, [storage_1_uuid, storage_2_uuid], msg_id_2)
-        # Transaction 3: 1 storage node involved, which won't die
-        msg_id_3 = 3
-        tid3 = tm.begin()
-        tm.prepare(client3, tid3, oid_list, [storage_2_uuid, ], msg_id_3)
-
-        t1 = tm[tid1]
         t2 = tm[tid2]
-        t3 = tm[tid3]
-
-        # Assert initial state
-        self.assertFalse(t1.locked())
         self.assertFalse(t2.locked())
-        self.assertFalse(t3.locked())
-
         # Storage 1 dies:
-        # t1 is over
-        self.assertTrue(t1.forget(storage_1_uuid))
-        self.assertEqual(t1.getUUIDList(), [storage_2_uuid])
         # t2 still waits for storage 2
         self.assertFalse(t2.forget(storage_1_uuid))
         self.assertEqual(t2.getUUIDList(), [storage_2_uuid])
         self.assertTrue(t2.lock(storage_2_uuid))
+        tm.remove(tid2)
+
+        # Transaction 3: 1 storage node involved, which won't die
+        msg_id_3 = 3
+        tid3 = tm.begin()
+        tm.prepare(client3, tid3, oid_list, [storage_2_uuid, ], msg_id_3)
+        t3 = tm[tid3]
+        self.assertFalse(t3.locked())
+        # Storage 1 dies:
         # t3 doesn't care
         self.assertFalse(t3.forget(storage_1_uuid))
         self.assertEqual(t3.getUUIDList(), [storage_2_uuid])
         self.assertTrue(t3.lock(storage_2_uuid))
+        tm.remove(tid3)
 
     def testTIDUtils(self):
         """
@@ -216,5 +215,19 @@ class testTransactionManager(NeoUnitTest
             unpackTID(addTID(packTID(((2010, 11, 30, 23, 59), 2**32 - 1)), 1)),
             ((2010, 12, 1, 0, 0), 0))
 
+    def testTransactionLock(self):
+        """
+        Transaction lock is present to ensure invalidation TIDs are sent in
+        strictly increasing order.
+        Note: this implementation might change later, to allow more paralelism.
+        """
+        tm = TransactionManager()
+        tid1 = tm.begin()
+        # Further calls fail with DelayedError
+        self.assertRaises(DelayedError, tm.begin)
+        # ...until tid1 gets removed
+        tm.remove(tid1)
+        tid2 = tm.begin()
+
 if __name__ == '__main__':
     unittest.main()

Modified: trunk/neo/tests/testProtocol.py
==============================================================================
--- trunk/neo/tests/testProtocol.py [iso-8859-1] (original)
+++ trunk/neo/tests/testProtocol.py [iso-8859-1] Tue Dec 14 16:56:46 2010
@@ -236,8 +236,10 @@ class ProtocolTests(NeoUnitTestBase):
 
 
     def test_32_askBeginTransaction(self):
-        p = Packets.AskBeginTransaction()
-        self.assertEqual(p.decode(), ())
+        tid = self.getNextTID()
+        p = Packets.AskBeginTransaction(tid)
+        ptid = p.decode()[0]
+        self.assertEqual(tid, ptid)
 
     def test_33_answerBeginTransaction(self):
         tid = self.getNextTID()




More information about the Neo-report mailing list