[Neo-report] r2159 gregory - in /trunk/neo: storage/ storage/handlers/ tests/storage/

nobody at svn.erp5.org nobody at svn.erp5.org
Fri Jun 11 10:17:25 CEST 2010


Author: gregory
Date: Fri Jun 11 10:17:21 2010
New Revision: 2159

Log:
Don't process queued lock requests related to an aborted transaction.

When a delayed store is processed, don't take the lock if the transaction
is no more registered (may have been aborted due to a timeout with the
delayed store).
Change the TransactionManager API: add register() that must be called before
any storeObject/storeTransaction operation.

Modified:
    trunk/neo/storage/handlers/client.py
    trunk/neo/storage/transactions.py
    trunk/neo/tests/storage/testClientHandler.py
    trunk/neo/tests/storage/testTransactions.py

Modified: trunk/neo/storage/handlers/client.py
==============================================================================
--- trunk/neo/storage/handlers/client.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/client.py [iso-8859-1] Fri Jun 11 10:17:21 2010
@@ -43,16 +43,23 @@ class ClientOperationHandler(BaseClientA
 
     def askStoreTransaction(self, conn, tid, user, desc,
                                   ext, oid_list):
-        uuid = conn.getUUID()
-        self.app.tm.storeTransaction(uuid, tid, oid_list, user, desc, ext,
+        self.app.tm.register(conn.getUUID(), tid)
+        self.app.tm.storeTransaction(tid, oid_list, user, desc, ext,
             False)
         conn.answer(Packets.AnswerStoreTransaction(tid))
 
     def _askStoreObject(self, conn, oid, serial, compression, checksum, data,
             tid, request_time):
-        uuid = conn.getUUID()
+        if tid not in self.app.tm:
+            # transaction was aborted, cancel this event
+            logging.info('Forget store of %s:%s by %s delayed by %s',
+                    dump(oid), dump(serial), dump(tid),
+                    dump(self.app.tm.getLockingTID(oid)))
+            # send an answer as the client side is waiting for it
+            conn.answer(Packets.AnswerStoreObject(0, oid, serial))
+            return
         try:
-            self.app.tm.storeObject(uuid, tid, serial, oid, compression,
+            self.app.tm.storeObject(tid, serial, oid, compression,
                     checksum, data, None)
         except ConflictError, err:
             # resolvable or not
@@ -71,6 +78,8 @@ class ClientOperationHandler(BaseClientA
 
     def askStoreObject(self, conn, oid, serial,
                              compression, checksum, data, tid):
+        # register the transaction
+        self.app.tm.register(conn.getUUID(), tid)
         self._askStoreObject(conn, oid, serial, compression, checksum, data,
             tid, time.time())
 

Modified: trunk/neo/storage/transactions.py
==============================================================================
--- trunk/neo/storage/transactions.py [iso-8859-1] (original)
+++ trunk/neo/storage/transactions.py [iso-8859-1] Fri Jun 11 10:17:21 2010
@@ -123,9 +123,9 @@ class TransactionManager(object):
         """
         return tid in self._transaction_dict
 
-    def _getTransaction(self, tid, uuid):
+    def register(self, uuid, tid):
         """
-            Get or create the transaction object for this tid
+            Register a transaction, it may be already registered
         """
         transaction = self._transaction_dict.get(tid, None)
         if transaction is None:
@@ -193,17 +193,18 @@ class TransactionManager(object):
             self._loid = self._loid_seen
             self._app.dm.setLastOID(self._loid)
 
-    def storeTransaction(self, uuid, tid, oid_list, user, desc, ext, packed):
+    def storeTransaction(self, tid, oid_list, user, desc, ext, packed):
         """
             Store transaction information received from client node
         """
-        transaction = self._getTransaction(tid, uuid)
+        assert tid in self, "Transaction not registered"
+        transaction = self._transaction_dict[tid]
         transaction.prepare(oid_list, user, desc, ext, packed)
 
     def getLockingTID(self, oid):
         return self._store_lock_dict.get(oid)
 
-    def storeObject(self, uuid, tid, serial, oid, compression, checksum, data,
+    def storeObject(self, tid, serial, oid, compression, checksum, data,
             value_serial):
         """
             Store an object received from client node
@@ -235,7 +236,8 @@ class TransactionManager(object):
             raise ConflictError(locking_tid)
 
         # store object
-        transaction = self._getTransaction(tid, uuid)
+        assert tid in self, "Transaction not registered"
+        transaction = self._transaction_dict[tid]
         transaction.addObject(oid, compression, checksum, data, value_serial)
 
         # update loid

Modified: trunk/neo/tests/storage/testClientHandler.py
==============================================================================
--- trunk/neo/tests/storage/testClientHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testClientHandler.py [iso-8859-1] Fri Jun 11 10:17:21 2010
@@ -44,7 +44,7 @@ class StorageClientHandlerTests(NeoTestB
         self.app.store_lock_dict = {}
         self.app.load_lock_dict = {}
         self.app.event_queue = deque()
-        self.app.tm = Mock()
+        self.app.tm = Mock({'__contains__': True})
         # handler
         self.operation = ClientOperationHandler(self.app)
         # set pmn
@@ -211,7 +211,7 @@ class StorageClientHandlerTests(NeoTestB
         oid, serial, comp, checksum, data = self._getObject()
         self.operation.askStoreObject(conn, oid, serial, comp, checksum, 
                 data, tid)
-        self._checkStoreObjectCalled(uuid, tid, serial, oid, comp,
+        self._checkStoreObjectCalled(tid, serial, oid, comp,
                 checksum, data, None)
         self.checkAnswerStoreObject(conn)
 

Modified: trunk/neo/tests/storage/testTransactions.py
==============================================================================
--- trunk/neo/tests/storage/testTransactions.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testTransactions.py [iso-8859-1] Fri Jun 11 10:17:21 2010
@@ -96,7 +96,7 @@ class TransactionManagerTests(NeoTestBas
 
     def _storeTransactionObjects(self, tid, txn):
         for i, oid in enumerate(txn[0]):
-            self.manager.storeObject(self.getNewUUID(), tid, None,
+            self.manager.storeObject(tid, None,
                     oid, 1, str(i), '0' + str(i), None)
 
     def _getObject(self, value):
@@ -117,16 +117,17 @@ class TransactionManagerTests(NeoTestBas
     def _checkQueuedEventExecuted(self, number=1):
         calls = self.app.mockGetNamedCalls('executeQueuedEvents')
         self.assertEqual(len(calls), number)
-        
+
     def testSimpleCase(self):
         """ One node, one transaction, not abort """
         uuid = self.getNewUUID()
         tid, txn = self._getTransaction()
         serial1, object1 = self._getObject(1)
         serial2, object2 = self._getObject(2)
-        self.manager.storeTransaction(uuid, tid, *txn)
-        self.manager.storeObject(uuid, tid, serial1, *object1)
-        self.manager.storeObject(uuid, tid, serial2, *object2)
+        self.manager.register(uuid,tid)
+        self.manager.storeTransaction(tid, *txn)
+        self.manager.storeObject(tid, serial1, *object1)
+        self.manager.storeObject(tid, serial2, *object2)
         self.assertTrue(tid in self.manager)
         self.manager.lock(tid, txn[0])
         self._checkTransactionStored(tid, [object1, object2], txn)
@@ -141,15 +142,17 @@ class TransactionManagerTests(NeoTestBas
         tid2, txn2 = self._getTransaction()
         serial, obj = self._getObject(1)
         # first transaction lock the object
-        self.manager.storeTransaction(uuid, tid1, *txn1)
+        self.manager.register(uuid, tid1)
+        self.manager.storeTransaction(tid1, *txn1)
         self.assertTrue(tid1 in self.manager)
         self._storeTransactionObjects(tid1, txn1)
         self.manager.lock(tid1, txn1[0])
         # the second is delayed
-        self.manager.storeTransaction(uuid, tid2, *txn2)
+        self.manager.register(uuid, tid2)
+        self.manager.storeTransaction(tid2, *txn2)
         self.assertTrue(tid2 in self.manager)
-        self.assertRaises(DelayedError, self.manager.storeObject, 
-                uuid, tid2, serial, *obj)
+        self.assertRaises(DelayedError, self.manager.storeObject,
+                tid2, serial, *obj)
 
     def testUnresolvableConflict(self):
         """ A newer transaction has already modified an object """
@@ -158,16 +161,18 @@ class TransactionManagerTests(NeoTestBas
         tid2, txn2 = self._getTransaction()
         serial, obj = self._getObject(1)
         # the (later) transaction lock (change) the object
-        self.manager.storeTransaction(uuid, tid2, *txn2)
-        self.manager.storeObject(uuid, tid2, serial, *obj)
+        self.manager.register(uuid, tid2)
+        self.manager.storeTransaction(tid2, *txn2)
+        self.manager.storeObject(tid2, serial, *obj)
         self.assertTrue(tid2 in self.manager)
         self._storeTransactionObjects(tid2, txn2)
         self.manager.lock(tid2, txn2[0])
         # the previous it's not using the latest version
-        self.manager.storeTransaction(uuid, tid1, *txn1)
+        self.manager.register(uuid, tid1)
+        self.manager.storeTransaction(tid1, *txn1)
         self.assertTrue(tid1 in self.manager)
-        self.assertRaises(ConflictError, self.manager.storeObject, 
-                uuid, tid1, serial, *obj)
+        self.assertRaises(ConflictError, self.manager.storeObject,
+                tid1, serial, *obj)
 
     def testResolvableConflict(self):
         """ Try to store an object with the lastest revision """
@@ -177,9 +182,10 @@ class TransactionManagerTests(NeoTestBas
         next_serial = self.getNextTID(serial)
         # try to store without the last revision
         self.app.dm = Mock({'getObjectHistory': [next_serial]})
-        self.manager.storeTransaction(uuid, tid, *txn)
-        self.assertRaises(ConflictError, self.manager.storeObject, 
-                uuid, tid, serial, *obj)
+        self.manager.register(uuid, tid)
+        self.manager.storeTransaction(tid, *txn)
+        self.assertRaises(ConflictError, self.manager.storeObject,
+                tid, serial, *obj)
 
     def testLockDelayed(self):
         """ Check lock delaytion"""
@@ -191,18 +197,20 @@ class TransactionManagerTests(NeoTestBas
         serial1, obj1 = self._getObject(1)
         serial2, obj2 = self._getObject(2)
         # first transaction lock objects
-        self.manager.storeTransaction(uuid1, tid1, *txn1)
+        self.manager.register(uuid1, tid1)
+        self.manager.storeTransaction(tid1, *txn1)
         self.assertTrue(tid1 in self.manager)
-        self.manager.storeObject(uuid1, tid1, serial1, *obj1)
-        self.manager.storeObject(uuid1, tid1, serial1, *obj2)
+        self.manager.storeObject(tid1, serial1, *obj1)
+        self.manager.storeObject(tid1, serial1, *obj2)
         self.manager.lock(tid1, txn1[0])
         # second transaction is delayed
-        self.manager.storeTransaction(uuid2, tid2, *txn2)
+        self.manager.register(uuid2, tid2)
+        self.manager.storeTransaction(tid2, *txn2)
         self.assertTrue(tid2 in self.manager)
         self.assertRaises(DelayedError, self.manager.storeObject,
-                uuid2, tid2, serial1, *obj1)
+                tid2, serial1, *obj1)
         self.assertRaises(DelayedError, self.manager.storeObject,
-                uuid2, tid2, serial2, *obj2)
+                tid2, serial2, *obj2)
 
     def testLockConflict(self):
         """ Check lock conflict """
@@ -214,26 +222,29 @@ class TransactionManagerTests(NeoTestBas
         serial1, obj1 = self._getObject(1)
         serial2, obj2 = self._getObject(2)
         # the second transaction lock objects
-        self.manager.storeTransaction(uuid2, tid2, *txn2)
-        self.manager.storeObject(uuid2, tid2, serial1, *obj1)
-        self.manager.storeObject(uuid2, tid2, serial2, *obj2)
+        self.manager.register(uuid2, tid2)
+        self.manager.storeTransaction(tid2, *txn2)
+        self.manager.storeObject(tid2, serial1, *obj1)
+        self.manager.storeObject(tid2, serial2, *obj2)
         self.assertTrue(tid2 in self.manager)
         self.manager.lock(tid2, txn1[0])
         # the first get a conflict
-        self.manager.storeTransaction(uuid1, tid1, *txn1)
+        self.manager.register(uuid1, tid1)
+        self.manager.storeTransaction(tid1, *txn1)
         self.assertTrue(tid1 in self.manager)
         self.assertRaises(ConflictError, self.manager.storeObject,
-                uuid1, tid1, serial1, *obj1)
+                tid1, serial1, *obj1)
         self.assertRaises(ConflictError, self.manager.storeObject,
-                uuid1, tid1, serial2, *obj2)
+                tid1, serial2, *obj2)
 
     def testAbortUnlocked(self):
         """ Abort a non-locked transaction """
         uuid = self.getNewUUID()
         tid, txn = self._getTransaction()
         serial, obj = self._getObject(1)
-        self.manager.storeTransaction(uuid, tid, *txn)
-        self.manager.storeObject(uuid, tid, serial, *obj)
+        self.manager.register(uuid, tid)
+        self.manager.storeTransaction(tid, *txn)
+        self.manager.storeObject(tid, serial, *obj)
         self.assertTrue(tid in self.manager)
         # transaction is not locked
         self.manager.abort(tid)
@@ -245,7 +256,8 @@ class TransactionManagerTests(NeoTestBas
         """ Try to abort a locked transaction """
         uuid = self.getNewUUID()
         tid, txn = self._getTransaction()
-        self.manager.storeTransaction(uuid, tid, *txn)
+        self.manager.register(uuid, tid)
+        self.manager.storeTransaction(tid, *txn)
         self._storeTransactionObjects(tid, txn)
         # lock transaction
         self.manager.lock(tid, txn[0])
@@ -255,7 +267,7 @@ class TransactionManagerTests(NeoTestBas
         for oid in txn[0]:
             self.assertTrue(self.manager.loadLocked(oid))
         self._checkQueuedEventExecuted(number=0)
-        
+
     def testAbortForNode(self):
         """ Abort transaction for a node """
         uuid1 = self.getNewUUID()
@@ -264,10 +276,13 @@ class TransactionManagerTests(NeoTestBas
         tid1, txn1 = self._getTransaction()
         tid2, txn2 = self._getTransaction()
         tid3, txn3 = self._getTransaction()
-        self.manager.storeTransaction(uuid1, tid1, *txn1)
+        self.manager.register(uuid1, tid1)
+        self.manager.register(uuid2, tid2)
+        self.manager.register(uuid2, tid3)
+        self.manager.storeTransaction(tid1, *txn1)
         # node 2 owns tid2 & tid3 and lock tid2 only
-        self.manager.storeTransaction(uuid2, tid2, *txn2)
-        self.manager.storeTransaction(uuid2, tid3, *txn3)
+        self.manager.storeTransaction(tid2, *txn2)
+        self.manager.storeTransaction(tid3, *txn3)
         self._storeTransactionObjects(tid2, txn2)
         self.manager.lock(tid2, txn2[0])
         self.assertTrue(tid1 in self.manager)
@@ -279,12 +294,13 @@ class TransactionManagerTests(NeoTestBas
         self.assertTrue(tid2 in self.manager)
         self.assertFalse(tid3 in self.manager)
         self._checkQueuedEventExecuted(number=1)
-        
+
     def testReset(self):
         """ Reset the manager """
         uuid = self.getNewUUID()
         tid, txn = self._getTransaction()
-        self.manager.storeTransaction(uuid, tid, *txn)
+        self.manager.register(uuid, tid)
+        self.manager.storeTransaction(tid, *txn)
         self._storeTransactionObjects(tid, txn)
         self.manager.lock(tid, txn[0])
         self.assertTrue(tid in self.manager)
@@ -299,7 +315,8 @@ class TransactionManagerTests(NeoTestBas
         tid2, txn2 = self._getTransaction()
         serial1, obj1 = self._getObject(1)
         serial2, obj2 = self._getObject(2)
-        self.manager.storeObject(uuid, tid1, serial1, *obj1)
+        self.manager.register(uuid, tid1)
+        self.manager.storeObject(tid1, serial1, *obj1)
         self.assertEqual(self.manager.getObjectFromTransaction(tid2, obj1[0]),
             None)
         self.assertEqual(self.manager.getObjectFromTransaction(tid1, obj2[0]),
@@ -313,7 +330,8 @@ class TransactionManagerTests(NeoTestBas
         oid1 = obj1[0]
         tid1, txn1 = self._getTransaction()
         self.assertEqual(self.manager.getLockingTID(oid1), None)
-        self.manager.storeObject(uuid, tid1, serial1, *obj1)
+        self.manager.register(uuid, tid1)
+        self.manager.storeObject(tid1, serial1, *obj1)
         self.assertEqual(self.manager.getLockingTID(oid1), tid1)
 
 if __name__ == "__main__":





More information about the Neo-report mailing list