[Neo-report] r2176 gregory - in /trunk/neo: client/ tests/ tests/client/
nobody at svn.erp5.org
nobody at svn.erp5.org
Fri Jun 18 16:17:11 CEST 2010
Author: gregory
Date: Fri Jun 18 16:17:10 2010
New Revision: 2176
Log:
Abort a transaction on all involved storages.
This fix an issue where some locks were not released on a storage because the
node list used to sent abort notifications was built with the content of
data_dict that don't keep informations about unresolved conflicts.
Modified:
trunk/neo/client/app.py
trunk/neo/tests/__init__.py
trunk/neo/tests/client/testClientApp.py
Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Fri Jun 18 16:17:10 2010
@@ -116,6 +116,7 @@ class ThreadContext(object):
'asked_object': 0,
'undo_conflict_oid_list': [],
'undo_error_oid_list': [],
+ 'involved_nodes': set(),
}
@@ -607,12 +608,14 @@ class Application(object):
self.local_var.object_serial_dict[oid] = (serial, version)
getConnForCell = self.cp.getConnForCell
queue = self.local_var.queue
+ add_involved_nodes = self.local_var.involved_nodes.add
for cell in cell_list:
conn = getConnForCell(cell)
if conn is None:
continue
try:
conn.ask(p, on_timeout=on_timeout, queue=queue)
+ add_involved_nodes(cell.getNode())
except ConnectionClosed:
continue
@@ -733,6 +736,7 @@ class Application(object):
p = Packets.AskStoreTransaction(tid, str(transaction.user),
str(transaction.description), dumps(transaction._extension),
local_var.data_dict.keys())
+ add_involved_nodes = self.local_var.involved_nodes.add
for cell in self._getCellListForTID(tid, writable=True):
logging.debug("voting object %s %s", cell.getAddress(),
cell.getState())
@@ -743,6 +747,7 @@ class Application(object):
local_var.txn_voted = False
try:
self._askStorage(conn, p)
+ add_involved_nodes(cell.getNode())
except ConnectionClosed:
continue
@@ -768,19 +773,10 @@ class Application(object):
return
tid = self.local_var.tid
- getCellListForOID = self._getCellListForOID
- # select nodes where transaction was stored
- node_set = set([x.getNode() for x in self._getCellListForTID(tid,
- writable=True)])
- # select nodes where objects were stored
- for oid in self.local_var.data_dict.iterkeys():
- node_set |= set([x.getNode() for x in getCellListForOID(oid,
- writable=True)])
-
p = Packets.AbortTransaction(tid)
getConnForNode = self.cp.getConnForNode
# cancel transaction one all those nodes
- for node in node_set:
+ for node in self.local_var.involved_nodes:
conn = getConnForNode(node)
if conn is None:
continue
Modified: trunk/neo/tests/__init__.py
==============================================================================
--- trunk/neo/tests/__init__.py [iso-8859-1] (original)
+++ trunk/neo/tests/__init__.py [iso-8859-1] Fri Jun 18 16:17:10 2010
@@ -292,6 +292,9 @@ class NeoTestBase(unittest.TestCase):
def checkInvalidateObjects(self, conn, **kw):
return self.checkNotifyPacket(conn, Packets.InvalidateObjects, **kw)
+ def checkAbortTransaction(self, conn, **kw):
+ return self.checkNotifyPacket(conn, Packets.AbortTransaction, **kw)
+
def checkAnswerTransactionFinished(self, conn, **kw):
return self.checkAnswerPacket(conn, Packets.AnswerTransactionFinished, **kw)
Modified: trunk/neo/tests/client/testClientApp.py
==============================================================================
--- trunk/neo/tests/client/testClientApp.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testClientApp.py [iso-8859-1] Fri Jun 18 16:17:10 2010
@@ -605,6 +605,7 @@ class ClientApplicationTests(NeoTestBase
app.cp = Mock({ 'getConnForNode': ReturnValues(conn1, conn2), })
# fake data
app.local_var.data_dict = {oid1: '', oid2: ''}
+ app.local_var.involved_nodes = set([cell1, cell2])
app.tpc_abort(txn)
# will check if there was just one call/packet :
self.checkNotifyPacket(conn1, Packets.AbortTransaction)
@@ -616,6 +617,69 @@ class ClientApplicationTests(NeoTestBase
self.assertEquals(app.local_var.txn_voted, False)
self.assertEquals(app.local_var.txn_finished, False)
+ def test_tpc_abort3(self):
+ """ check that abort is sent to all nodes involved in the transaction """
+ app = self.getApp()
+ # three partitions/storages: one per object/transaction
+ app.num_partitions = 3
+ app.num_replicas = 0
+ tid = self.makeTID(0) # on partition 0
+ oid1 = self.makeOID(1) # on partition 1, conflicting
+ oid2 = self.makeOID(2) # on partition 2
+ # storage nodes
+ address1 = ('127.0.0.1', 10000)
+ address2 = ('127.0.0.1', 10001)
+ address3 = ('127.0.0.1', 10002)
+ app.nm.createMaster(address=address1)
+ app.nm.createStorage(address=address2)
+ app.nm.createStorage(address=address3)
+ # answer packets
+ packet1 = Packets.AnswerStoreTransaction(tid=tid)
+ packet2 = Packets.AnswerStoreObject(conflicting=1, oid=oid1, serial=tid)
+ packet3 = Packets.AnswerStoreObject(conflicting=0, oid=oid2, serial=tid)
+ [p.setId(i) for p, i in zip([packet1, packet2, packet3], range(3))]
+ conn1 = Mock({'__repr__': 'conn1', 'getAddress': address1, 'fakeReceived': packet1})
+ conn2 = Mock({'__repr__': 'conn2', 'getAddress': address2, 'fakeReceived': packet2})
+ conn3 = Mock({'__repr__': 'conn3', 'getAddress': address3, 'fakeReceived': packet3})
+ node1 = Mock({'__repr__': 'node1', '__hash__': 1, 'getConnection': conn1})
+ node2 = Mock({'__repr__': 'node2', '__hash__': 2, 'getConnection': conn2})
+ node3 = Mock({'__repr__': 'node3', '__hash__': 3, 'getConnection': conn3})
+ cell1 = Mock({ 'getNode': node1, '__hash__': 1, 'getConnection': conn1})
+ cell2 = Mock({ 'getNode': node2, '__hash__': 2, 'getConnection': conn2})
+ cell3 = Mock({ 'getNode': node3, '__hash__': 3, 'getConnection': conn3})
+ # fake environment
+ app.pt = Mock({
+ 'getCellListForTID': [cell1],
+ 'getCellListForOID': ReturnValues([cell2], [cell3]),
+ })
+ app.cp = Mock({'getConnForCell': ReturnValues(conn2, conn3, conn1)})
+ app.dispatcher = Mock()
+ app.master_conn = Mock({'__hash__': 0})
+ txn = self.makeTransactionObject()
+ app.local_var.txn, app.local_var.tid = txn, tid
+ class Dispatcher(object):
+ def pending(self, queue):
+ return not queue.empty()
+ app.dispatcher = Dispatcher()
+ # begin a transaction
+ app.tpc_begin(txn, tid)
+ # conflict occurs on storage 2
+ app.store(oid1, tid, 'DATA', None, txn)
+ app.store(oid2, tid, 'DATA', None, txn)
+ app.local_var.queue.put((conn2, packet2))
+ app.local_var.queue.put((conn3, packet3))
+ # vote fails as the conflict is not resolved, nothing is sent to storage 3
+ self.assertRaises(ConflictError, app.tpc_vote, txn, failing_tryToResolveConflict)
+ class ConnectionPool(object):
+ def getConnForNode(self, node):
+ return node.getConnection()
+ app.cp = ConnectionPool()
+ # abort must be sent to storage 1 and 2
+ app.tpc_abort(txn)
+ self.checkAbortTransaction(app.master_conn)
+ self.checkAbortTransaction(conn2)
+ self.checkAbortTransaction(conn3)
+
def test_tpc_finish1(self):
# ignore mismatch transaction
app = self.getApp()
More information about the Neo-report
mailing list