[Neo-report] r2176 gregory - in /trunk/neo: client/ tests/ tests/client/

nobody at svn.erp5.org nobody at svn.erp5.org
Fri Jun 18 16:17:11 CEST 2010


Author: gregory
Date: Fri Jun 18 16:17:10 2010
New Revision: 2176

Log:
Abort a transaction on all involved storages.

This fix an issue where some locks were not released on a storage because the
node list used to sent abort notifications was built with the content of
data_dict that don't keep informations about unresolved conflicts.

Modified:
    trunk/neo/client/app.py
    trunk/neo/tests/__init__.py
    trunk/neo/tests/client/testClientApp.py

Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Fri Jun 18 16:17:10 2010
@@ -116,6 +116,7 @@ class ThreadContext(object):
             'asked_object': 0,
             'undo_conflict_oid_list': [],
             'undo_error_oid_list': [],
+            'involved_nodes': set(),
         }
 
 
@@ -607,12 +608,14 @@ class Application(object):
         self.local_var.object_serial_dict[oid] = (serial, version)
         getConnForCell = self.cp.getConnForCell
         queue = self.local_var.queue
+        add_involved_nodes = self.local_var.involved_nodes.add
         for cell in cell_list:
             conn = getConnForCell(cell)
             if conn is None:
                 continue
             try:
                 conn.ask(p, on_timeout=on_timeout, queue=queue)
+                add_involved_nodes(cell.getNode())
             except ConnectionClosed:
                 continue
 
@@ -733,6 +736,7 @@ class Application(object):
         p = Packets.AskStoreTransaction(tid, str(transaction.user),
             str(transaction.description), dumps(transaction._extension),
             local_var.data_dict.keys())
+        add_involved_nodes = self.local_var.involved_nodes.add
         for cell in self._getCellListForTID(tid, writable=True):
             logging.debug("voting object %s %s", cell.getAddress(),
                 cell.getState())
@@ -743,6 +747,7 @@ class Application(object):
             local_var.txn_voted = False
             try:
                 self._askStorage(conn, p)
+                add_involved_nodes(cell.getNode())
             except ConnectionClosed:
                 continue
 
@@ -768,19 +773,10 @@ class Application(object):
             return
 
         tid = self.local_var.tid
-        getCellListForOID = self._getCellListForOID
-        # select nodes where transaction was stored
-        node_set = set([x.getNode() for x in self._getCellListForTID(tid,
-            writable=True)])
-        # select nodes where objects were stored
-        for oid in self.local_var.data_dict.iterkeys():
-            node_set |= set([x.getNode() for x in getCellListForOID(oid,
-                writable=True)])
-
         p = Packets.AbortTransaction(tid)
         getConnForNode = self.cp.getConnForNode
         # cancel transaction one all those nodes
-        for node in node_set:
+        for node in self.local_var.involved_nodes:
             conn = getConnForNode(node)
             if conn is None:
                 continue

Modified: trunk/neo/tests/__init__.py
==============================================================================
--- trunk/neo/tests/__init__.py [iso-8859-1] (original)
+++ trunk/neo/tests/__init__.py [iso-8859-1] Fri Jun 18 16:17:10 2010
@@ -292,6 +292,9 @@ class NeoTestBase(unittest.TestCase):
     def checkInvalidateObjects(self, conn, **kw):
         return self.checkNotifyPacket(conn, Packets.InvalidateObjects, **kw)
 
+    def checkAbortTransaction(self, conn, **kw):
+        return self.checkNotifyPacket(conn, Packets.AbortTransaction, **kw)
+
     def checkAnswerTransactionFinished(self, conn, **kw):
         return self.checkAnswerPacket(conn, Packets.AnswerTransactionFinished, **kw)
 

Modified: trunk/neo/tests/client/testClientApp.py
==============================================================================
--- trunk/neo/tests/client/testClientApp.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testClientApp.py [iso-8859-1] Fri Jun 18 16:17:10 2010
@@ -605,6 +605,7 @@ class ClientApplicationTests(NeoTestBase
         app.cp = Mock({ 'getConnForNode': ReturnValues(conn1, conn2), })
         # fake data
         app.local_var.data_dict = {oid1: '', oid2: ''}
+        app.local_var.involved_nodes = set([cell1, cell2])
         app.tpc_abort(txn)
         # will check if there was just one call/packet :
         self.checkNotifyPacket(conn1, Packets.AbortTransaction)
@@ -616,6 +617,69 @@ class ClientApplicationTests(NeoTestBase
         self.assertEquals(app.local_var.txn_voted, False)
         self.assertEquals(app.local_var.txn_finished, False)
 
+    def test_tpc_abort3(self):
+        """ check that abort is sent to all nodes involved in the transaction """
+        app = self.getApp()
+        # three partitions/storages: one per object/transaction
+        app.num_partitions = 3
+        app.num_replicas = 0
+        tid = self.makeTID(0)  # on partition 0
+        oid1 = self.makeOID(1) # on partition 1, conflicting
+        oid2 = self.makeOID(2) # on partition 2
+        # storage nodes
+        address1 = ('127.0.0.1', 10000)
+        address2 = ('127.0.0.1', 10001)
+        address3 = ('127.0.0.1', 10002)
+        app.nm.createMaster(address=address1)
+        app.nm.createStorage(address=address2)
+        app.nm.createStorage(address=address3)
+        # answer packets
+        packet1 = Packets.AnswerStoreTransaction(tid=tid)
+        packet2 = Packets.AnswerStoreObject(conflicting=1, oid=oid1, serial=tid)
+        packet3 = Packets.AnswerStoreObject(conflicting=0, oid=oid2, serial=tid)
+        [p.setId(i) for p, i in zip([packet1, packet2, packet3], range(3))]
+        conn1 = Mock({'__repr__': 'conn1', 'getAddress': address1, 'fakeReceived': packet1})
+        conn2 = Mock({'__repr__': 'conn2', 'getAddress': address2, 'fakeReceived': packet2})
+        conn3 = Mock({'__repr__': 'conn3', 'getAddress': address3, 'fakeReceived': packet3})
+        node1 = Mock({'__repr__': 'node1', '__hash__': 1, 'getConnection': conn1})
+        node2 = Mock({'__repr__': 'node2', '__hash__': 2, 'getConnection': conn2})
+        node3 = Mock({'__repr__': 'node3', '__hash__': 3, 'getConnection': conn3})
+        cell1 = Mock({ 'getNode': node1, '__hash__': 1, 'getConnection': conn1})
+        cell2 = Mock({ 'getNode': node2, '__hash__': 2, 'getConnection': conn2})
+        cell3 = Mock({ 'getNode': node3, '__hash__': 3, 'getConnection': conn3})
+        # fake environment
+        app.pt = Mock({
+            'getCellListForTID': [cell1],
+            'getCellListForOID': ReturnValues([cell2], [cell3]),
+        })
+        app.cp = Mock({'getConnForCell': ReturnValues(conn2, conn3, conn1)})
+        app.dispatcher = Mock()
+        app.master_conn = Mock({'__hash__': 0})
+        txn = self.makeTransactionObject()
+        app.local_var.txn, app.local_var.tid = txn, tid
+        class Dispatcher(object):
+            def pending(self, queue):
+                return not queue.empty()
+        app.dispatcher = Dispatcher()
+        # begin a transaction
+        app.tpc_begin(txn, tid)
+        # conflict occurs on storage 2
+        app.store(oid1, tid, 'DATA', None, txn)
+        app.store(oid2, tid, 'DATA', None, txn)
+        app.local_var.queue.put((conn2, packet2))
+        app.local_var.queue.put((conn3, packet3))
+        # vote fails as the conflict is not resolved, nothing is sent to storage 3
+        self.assertRaises(ConflictError, app.tpc_vote, txn, failing_tryToResolveConflict)
+        class ConnectionPool(object):
+            def getConnForNode(self, node):
+                return node.getConnection()
+        app.cp = ConnectionPool()
+        # abort must be sent to storage 1 and 2
+        app.tpc_abort(txn)
+        self.checkAbortTransaction(app.master_conn)
+        self.checkAbortTransaction(conn2)
+        self.checkAbortTransaction(conn3)
+
     def test_tpc_finish1(self):
         # ignore mismatch transaction
         app = self.getApp()





More information about the Neo-report mailing list