[Neo-report] r2530 vincent - in /trunk/neo: ./ client/ master/ master/handlers/ tests/ tes...
nobody at svn.erp5.org
nobody at svn.erp5.org
Tue Dec 14 16:56:46 CET 2010
Author: vincent
Date: Tue Dec 14 16:56:46 2010
New Revision: 2530
Log:
Serialise the whole 2PC at cluster level.
This ensures invalidations are sent in strict ascending TID values order.
Modified:
trunk/neo/client/app.py
trunk/neo/handler.py
trunk/neo/master/app.py
trunk/neo/master/handlers/client.py
trunk/neo/master/handlers/storage.py
trunk/neo/master/transactions.py
trunk/neo/protocol.py
trunk/neo/tests/master/testClientHandler.py
trunk/neo/tests/master/testStorageHandler.py
trunk/neo/tests/master/testTransactions.py
trunk/neo/tests/testProtocol.py
Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Tue Dec 14 16:56:46 2010
@@ -601,11 +601,10 @@ class Application(object):
if self.local_var.txn is not None:
raise NeoException, 'local_var is not clean in tpc_begin'
# use the given TID or request a new one to the master
- self.local_var.tid = tid
- if tid is None:
- self._askPrimary(Packets.AskBeginTransaction())
- if self.local_var.tid is None:
- raise NEOStorageError('tpc_begin failed')
+ self._askPrimary(Packets.AskBeginTransaction(tid))
+ if self.local_var.tid is None:
+ raise NEOStorageError('tpc_begin failed')
+ assert tid in (None, self.local_var.tid), (tid, self.local_var.tid)
self.local_var.txn = transaction
@profiler_decorator
@@ -837,6 +836,7 @@ class Application(object):
except:
neo.logging.error('Exception in tpc_abort while notifying ' \
'storage node %r of abortion, ignoring.', conn, exc_info=1)
+ self._getMasterConnection().notify(p)
# Just wait for responses to arrive. If any leads to an exception,
# log it and continue: we *must* eat all answers to not disturb the
Modified: trunk/neo/handler.py
==============================================================================
--- trunk/neo/handler.py [iso-8859-1] (original)
+++ trunk/neo/handler.py [iso-8859-1] Tue Dec 14 16:56:46 2010
@@ -196,7 +196,7 @@ class EventHandler(object):
def commitTransaction(self, conn, tid):
raise UnexpectedPacketError
- def askBeginTransaction(self, conn):
+ def askBeginTransaction(self, conn, tid):
raise UnexpectedPacketError
def answerBeginTransaction(self, conn, tid):
Modified: trunk/neo/master/app.py
==============================================================================
--- trunk/neo/master/app.py [iso-8859-1] (original)
+++ trunk/neo/master/app.py [iso-8859-1] Tue Dec 14 16:56:46 2010
@@ -45,6 +45,7 @@ class Application(object):
last_transaction = ZERO_TID
def __init__(self, config):
+ self._queued_events = []
# always use default connector for now
self.connector_handler = getConnectorHandler()
@@ -578,3 +579,17 @@ class Application(object):
def isStorageReady(self, uuid):
return uuid in self.storage_readiness
+ def queueEvent(self, func, conn, *args, **kw):
+ msg_id = conn.getPeerId()
+ self._queued_events.append((func, msg_id, conn, args, kw))
+
+ def executeQueuedEvent(self):
+ queue = self._queued_events
+ while queue:
+ func, msg_id, conn, args, kw = queue.pop(0)
+ if conn.isAborted() or conn.isClosed():
+ continue
+ conn.setPeerId(msg_id)
+ func(conn, *args, **kw)
+ break
+
Modified: trunk/neo/master/handlers/client.py
==============================================================================
--- trunk/neo/master/handlers/client.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/client.py [iso-8859-1] Tue Dec 14 16:56:46 2010
@@ -20,7 +20,7 @@ import neo
from neo.protocol import NodeStates, Packets, ProtocolError
from neo.master.handlers import MasterHandler
from neo.util import dump
-
+from neo.master.transactions import DelayedError
class ClientServiceHandler(MasterHandler):
""" Handler dedicated to client during service state """
@@ -47,11 +47,14 @@ class ClientServiceHandler(MasterHandler
conn.notify(Packets.NotifyNodeInformation(node_list))
conn.answer(Packets.AnswerNodeInformation())
- def askBeginTransaction(self, conn):
+ def askBeginTransaction(self, conn, tid):
"""
A client request a TID, nothing is kept about it until the finish.
"""
- conn.answer(Packets.AnswerBeginTransaction(self.app.tm.begin()))
+ try:
+ conn.answer(Packets.AnswerBeginTransaction(self.app.tm.begin(tid)))
+ except DelayedError:
+ self.app.queueEvent(self.askBeginTransaction, conn, tid)
def askNewOIDs(self, conn, num_oids):
app = self.app
@@ -108,3 +111,8 @@ class ClientServiceHandler(MasterHandler
conn.answer(Packets.AnswerLastTransaction(
self.app.getLastTransaction()))
+ def abortTransaction(self, conn, tid):
+ app = self.app
+ app.tm.remove(tid)
+ app.executeQueuedEvent()
+
Modified: trunk/neo/master/handlers/storage.py
==============================================================================
--- trunk/neo/master/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/storage.py [iso-8859-1] Tue Dec 14 16:56:46 2010
@@ -107,6 +107,7 @@ class StorageServiceHandler(BaseServiceH
# remove transaction from manager
tm.remove(tid)
app.setLastTransaction(tid)
+ app.executeQueuedEvent()
def notifyReplicationDone(self, conn, offset):
node = self.app.nm.getByUUID(conn.getUUID())
Modified: trunk/neo/master/transactions.py
==============================================================================
--- trunk/neo/master/transactions.py [iso-8859-1] (original)
+++ trunk/neo/master/transactions.py [iso-8859-1] Tue Dec 14 16:56:46 2010
@@ -84,6 +84,9 @@ def addTID(ptid, offset):
higher = (d.year, d.month, d.day, d.hour, d.minute)
return packTID((higher, lower))
+class DelayedError(Exception):
+ pass
+
class Transaction(object):
"""
A pending transaction
@@ -177,6 +180,9 @@ class TransactionManager(object):
Manage current transactions
"""
_last_tid = ZERO_TID
+ # Transaction serialisation
+ # We don't need to use a real lock, as we are mono-threaded.
+ _locked = None
def __init__(self):
# tid -> transaction
@@ -272,11 +278,16 @@ class TransactionManager(object):
"""
return self._tid_dict.keys()
- def begin(self):
+ def begin(self, tid=None):
"""
Generate a new TID
"""
- return self._nextTID()
+ if self._locked is not None:
+ raise DelayedError()
+ if tid is None:
+ tid = self._nextTID()
+ self._locked = tid
+ return tid
def prepare(self, node, tid, oid_list, uuid_list, msg_id):
"""
@@ -291,9 +302,14 @@ class TransactionManager(object):
"""
Remove a transaction, commited or aborted
"""
- node = self._tid_dict[tid].getNode()
- del self._tid_dict[tid]
- del self._node_dict[node][tid]
+ assert self._locked == tid, (self._locked, tid)
+ self._locked = None
+ tid_dict = self._tid_dict
+ if tid in tid_dict:
+ # ...and tried to finish
+ node = tid_dict[tid].getNode()
+ del tid_dict[tid]
+ del self._node_dict[node][tid]
def lock(self, tid, uuid):
"""
Modified: trunk/neo/protocol.py
==============================================================================
--- trunk/neo/protocol.py [iso-8859-1] (original)
+++ trunk/neo/protocol.py [iso-8859-1] Tue Dec 14 16:56:46 2010
@@ -742,6 +742,11 @@ class AskBeginTransaction(Packet):
"""
Ask to begin a new transaction. C -> PM.
"""
+ def _encode(self, tid):
+ return _encodeTID(tid)
+
+ def _decode(self, body):
+ return (_decodeTID(unpack('8s', body)[0]), )
class AnswerBeginTransaction(Packet):
"""
Modified: trunk/neo/tests/master/testClientHandler.py
==============================================================================
--- trunk/neo/tests/master/testClientHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testClientHandler.py [iso-8859-1] Tue Dec 14 16:56:46 2010
@@ -70,7 +70,7 @@ class MasterClientHandlerTests(NeoUnitTe
# client call it
client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, port=self.client_port)
conn = self.getFakeConnection(client_uuid, self.client_address)
- service.askBeginTransaction(conn)
+ service.askBeginTransaction(conn, None)
self.assertTrue(ltid < self.app.tm.getLastTID())
def test_08_askNewOIDs(self):
@@ -102,7 +102,7 @@ class MasterClientHandlerTests(NeoUnitTe
'getPartition': 0,
'getCellList': [Mock({'getUUID': storage_uuid})],
})
- service.askBeginTransaction(conn)
+ service.askBeginTransaction(conn, None)
oid_list = []
tid = self.app.tm.getLastTID()
conn = self.getFakeConnection(client_uuid, self.client_address)
Modified: trunk/neo/tests/master/testStorageHandler.py
==============================================================================
--- trunk/neo/tests/master/testStorageHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testStorageHandler.py [iso-8859-1] Tue Dec 14 16:56:46 2010
@@ -201,9 +201,6 @@ class MasterStorageHandlerTests(NeoUnitT
client1, cconn1 = self._getClient()
client2, cconn2 = self._getClient()
client3, cconn3 = self._getClient()
- tid1 = self.getNextTID()
- tid2 = self.getNextTID(tid1)
- tid3 = self.getNextTID(tid2)
oid_list = [self.getOID(), ]
# Some shortcuts to simplify test code
@@ -215,25 +212,13 @@ class MasterStorageHandlerTests(NeoUnitT
# Transaction 1: 2 storage nodes involved, one will die and the other
# already answered node lock
msg_id_1 = 1
- tm.prepare(client1, tid1, oid_list, [node1.getUUID(), node2.getUUID()], msg_id_1)
+ tid1 = tm.begin()
+ tm.prepare(client1, tid1, oid_list,
+ [node1.getUUID(), node2.getUUID()], msg_id_1)
tm.lock(tid1, node2.getUUID())
- # Transaction 2: 2 storage nodes involved, one will die
- msg_id_2 = 2
- tm.prepare(client2, tid2, oid_list, [node1.getUUID(), node2.getUUID()], msg_id_2)
- # Transaction 3: 1 storage node involved, which won't die
- msg_id_3 = 3
- tm.prepare(client3, tid3, oid_list, [node2.getUUID(), ], msg_id_3)
-
- # Assert initial state
- self.checkNoPacketSent(cconn1)
- self.checkNoPacketSent(cconn2)
- self.checkNoPacketSent(cconn3)
-
# Storage 1 dies
node1.setTemporarilyDown()
self.service.nodeLost(conn1, node1)
-
- # Check state after node lost
# T1: last locking node lost, client receives AnswerTransactionFinished
self.checkAnswerTransactionFinished(cconn1)
self.checkNotifyUnlockInformation(conn2)
@@ -241,10 +226,24 @@ class MasterStorageHandlerTests(NeoUnitT
# ...and notifications are sent to other clients
self.checkInvalidateObjects(cconn2)
self.checkInvalidateObjects(cconn3)
+
+ # Transaction 2: 2 storage nodes involved, one will die
+ msg_id_2 = 2
+ tid2 = tm.begin()
+ tm.prepare(client2, tid2, oid_list,
+ [node1.getUUID(), node2.getUUID()], msg_id_2)
# T2: pending locking answer, client keeps waiting
self.checkNoPacketSent(cconn2, check_notify=False)
+ tm.remove(tid2)
+
+ # Transaction 3: 1 storage node involved, which won't die
+ msg_id_3 = 3
+ tid3 = tm.begin()
+ tm.prepare(client3, tid3, oid_list,
+ [node2.getUUID(), ], msg_id_3)
# T3: action not significant to this transacion, so no response
self.checkNoPacketSent(cconn3, check_notify=False)
+ tm.remove(tid3)
def test_answerPack(self):
# Note: incomming status has no meaning here, so it's left to False.
Modified: trunk/neo/tests/master/testTransactions.py
==============================================================================
--- trunk/neo/tests/master/testTransactions.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testTransactions.py [iso-8859-1] Tue Dec 14 16:56:46 2010
@@ -22,7 +22,7 @@ from neo.tests import NeoUnitTestBase
from neo.protocol import ZERO_TID
from neo.master.transactions import Transaction, TransactionManager
-from neo.master.transactions import packTID, unpackTID, addTID
+from neo.master.transactions import packTID, unpackTID, addTID, DelayedError
class testTransactionManager(NeoUnitTestBase):
@@ -90,26 +90,19 @@ class testTransactionManager(NeoUnitTest
storage_2_uuid = self.makeUUID(2)
txnman = TransactionManager()
# register 4 transactions made by two nodes
- tid11 = txnman.begin()
- txnman.prepare(node1, tid11, oid_list, [storage_1_uuid], 1)
- tid12 = txnman.begin()
- txnman.prepare(node1, tid12, oid_list, [storage_1_uuid], 2)
- tid21 = txnman.begin()
- txnman.prepare(node2, tid21, oid_list, [storage_2_uuid], 3)
- tid22 = txnman.begin()
- txnman.prepare(node2, tid22, oid_list, [storage_2_uuid], 4)
- self.assertTrue(tid11 < tid12 < tid21 < tid22)
- self.assertEqual(len(txnman.getPendingList()), 4)
- # abort transactions of one node
- txnman.abortFor(node1)
- tid_list = txnman.getPendingList()
- self.assertEqual(len(tid_list), 2)
- self.assertTrue(tid21 in tid_list)
- self.assertTrue(tid22 in tid_list)
- # then the other
+ self.assertEqual(txnman.getPendingList(), [])
+ tid1 = txnman.begin()
+ txnman.prepare(node1, tid1, oid_list, [storage_1_uuid], 1)
+ self.assertEqual(txnman.getPendingList(), [tid1])
+ # abort transactions of another node, transaction stays
txnman.abortFor(node2)
+ self.assertEqual(txnman.getPendingList(), [tid1])
+ # abort transactions of requesting node, transaction is removed
+ txnman.abortFor(node1)
self.assertEqual(txnman.getPendingList(), [])
self.assertFalse(txnman.hasPending())
+ # ...and we can start another transaction
+ tid2 = txnman.begin()
def test_getNextOIDList(self):
txnman = TransactionManager()
@@ -138,6 +131,9 @@ class testTransactionManager(NeoUnitTest
txnman.setLastTID(ntid)
self.assertEqual(txnman.getLastTID(), ntid)
self.assertTrue(ntid > tid1)
+ # If a new TID is generated, DelayedError is raised
+ self.assertRaises(DelayedError, txnman.begin)
+ txnman.remove(tid1)
# new trancation
node2 = Mock({'__hash__': 2})
tid2 = txnman.begin()
@@ -159,36 +155,39 @@ class testTransactionManager(NeoUnitTest
tid1 = tm.begin()
tm.prepare(client1, tid1, oid_list, [storage_1_uuid, storage_2_uuid], msg_id_1)
tm.lock(tid1, storage_2_uuid)
+ t1 = tm[tid1]
+ self.assertFalse(t1.locked())
+ # Storage 1 dies:
+ # t1 is over
+ self.assertTrue(t1.forget(storage_1_uuid))
+ self.assertEqual(t1.getUUIDList(), [storage_2_uuid])
+ tm.remove(tid1)
+
# Transaction 2: 2 storage nodes involved, one will die
msg_id_2 = 2
tid2 = tm.begin()
tm.prepare(client2, tid2, oid_list, [storage_1_uuid, storage_2_uuid], msg_id_2)
- # Transaction 3: 1 storage node involved, which won't die
- msg_id_3 = 3
- tid3 = tm.begin()
- tm.prepare(client3, tid3, oid_list, [storage_2_uuid, ], msg_id_3)
-
- t1 = tm[tid1]
t2 = tm[tid2]
- t3 = tm[tid3]
-
- # Assert initial state
- self.assertFalse(t1.locked())
self.assertFalse(t2.locked())
- self.assertFalse(t3.locked())
-
# Storage 1 dies:
- # t1 is over
- self.assertTrue(t1.forget(storage_1_uuid))
- self.assertEqual(t1.getUUIDList(), [storage_2_uuid])
# t2 still waits for storage 2
self.assertFalse(t2.forget(storage_1_uuid))
self.assertEqual(t2.getUUIDList(), [storage_2_uuid])
self.assertTrue(t2.lock(storage_2_uuid))
+ tm.remove(tid2)
+
+ # Transaction 3: 1 storage node involved, which won't die
+ msg_id_3 = 3
+ tid3 = tm.begin()
+ tm.prepare(client3, tid3, oid_list, [storage_2_uuid, ], msg_id_3)
+ t3 = tm[tid3]
+ self.assertFalse(t3.locked())
+ # Storage 1 dies:
# t3 doesn't care
self.assertFalse(t3.forget(storage_1_uuid))
self.assertEqual(t3.getUUIDList(), [storage_2_uuid])
self.assertTrue(t3.lock(storage_2_uuid))
+ tm.remove(tid3)
def testTIDUtils(self):
"""
@@ -216,5 +215,19 @@ class testTransactionManager(NeoUnitTest
unpackTID(addTID(packTID(((2010, 11, 30, 23, 59), 2**32 - 1)), 1)),
((2010, 12, 1, 0, 0), 0))
+ def testTransactionLock(self):
+ """
+ Transaction lock is present to ensure invalidation TIDs are sent in
+ strictly increasing order.
+ Note: this implementation might change later, to allow more paralelism.
+ """
+ tm = TransactionManager()
+ tid1 = tm.begin()
+ # Further calls fail with DelayedError
+ self.assertRaises(DelayedError, tm.begin)
+ # ...until tid1 gets removed
+ tm.remove(tid1)
+ tid2 = tm.begin()
+
if __name__ == '__main__':
unittest.main()
Modified: trunk/neo/tests/testProtocol.py
==============================================================================
--- trunk/neo/tests/testProtocol.py [iso-8859-1] (original)
+++ trunk/neo/tests/testProtocol.py [iso-8859-1] Tue Dec 14 16:56:46 2010
@@ -236,8 +236,10 @@ class ProtocolTests(NeoUnitTestBase):
def test_32_askBeginTransaction(self):
- p = Packets.AskBeginTransaction()
- self.assertEqual(p.decode(), ())
+ tid = self.getNextTID()
+ p = Packets.AskBeginTransaction(tid)
+ ptid = p.decode()[0]
+ self.assertEqual(tid, ptid)
def test_33_answerBeginTransaction(self):
tid = self.getNextTID()
More information about the Neo-report
mailing list