[Neo-report] r2563 gregory - in /trunk/neo: master/ master/handlers/ tests/master/
nobody at svn.erp5.org
nobody at svn.erp5.org
Wed Dec 22 18:10:03 CET 2010
Author: gregory
Date: Wed Dec 22 18:10:02 2010
New Revision: 2563
Log:
Move _afterLock to app and move logic from handlers to transaction manager.
Modified:
trunk/neo/master/app.py
trunk/neo/master/handlers/storage.py
trunk/neo/master/transactions.py
trunk/neo/tests/master/testTransactions.py
Modified: trunk/neo/master/app.py
==============================================================================
--- trunk/neo/master/app.py [iso-8859-1] (original)
+++ trunk/neo/master/app.py [iso-8859-1] Wed Dec 22 18:10:02 2010
@@ -53,7 +53,7 @@ class Application(object):
# Internal attributes.
self.em = EventManager()
self.nm = NodeManager()
- self.tm = TransactionManager()
+ self.tm = TransactionManager(self.onTransactionCommitted)
self.name = config.getCluster()
self.server = config.getBind()
@@ -562,6 +562,32 @@ class Application(object):
neo.logging.info('Accept a storage %s (%s)' % (dump(uuid), state))
return (uuid, node, state, handler, node_ctor)
+ def onTransactionCommitted(self, tid, txn):
+ # I have received all the lock answers now:
+ # - send a Notify Transaction Finished to the initiated client node
+ # - Invalidate Objects to the other client nodes
+ ttid = txn.getTTID()
+ transaction_node = txn.getNode()
+ invalidate_objects = Packets.InvalidateObjects(tid, txn.getOIDList())
+ transaction_finished = Packets.AnswerTransactionFinished(ttid, tid)
+ for client_node in self.nm.getClientList(only_identified=True):
+ c = client_node.getConnection()
+ if client_node is transaction_node:
+ c.answer(transaction_finished, msg_id=txn.getMessageId())
+ else:
+ c.notify(invalidate_objects)
+
+ # Unlock Information to relevant storage nodes.
+ notify_unlock = Packets.NotifyUnlockInformation(ttid)
+ getByUUID = self.nm.getByUUID
+ for storage_uuid in txn.getUUIDList():
+ getByUUID(storage_uuid).getConnection().notify(notify_unlock)
+
+ # remove transaction from manager
+ self.tm.remove(tid)
+ self.setLastTransaction(tid)
+ self.executeQueuedEvent()
+
def getLastTransaction(self):
return self.last_transaction
Modified: trunk/neo/master/handlers/storage.py
==============================================================================
--- trunk/neo/master/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/storage.py [iso-8859-1] Wed Dec 22 18:10:02 2010
@@ -48,13 +48,8 @@ class StorageServiceHandler(BaseServiceH
# this is intentionaly placed after the raise because the last cell in a
# partition must not oudated to allows a cluster restart.
self.app.outdateAndBroadcastPartition()
- uuid = conn.getUUID()
- for tid, transaction in self.app.tm.items():
- # if a transaction is known, this means that it's being committed
- if transaction.forget(uuid):
- self._afterLock(tid)
- packing = self.app.packing
- if packing is not None:
+ self.app.tm.forget(conn.getUUID())
+ if self.app.packing is not None:
self.answerPack(conn, False)
def askLastIDs(self, conn):
@@ -68,9 +63,7 @@ class StorageServiceHandler(BaseServiceH
conn.answer(p)
def answerInformationLocked(self, conn, tid):
- uuid = conn.getUUID()
- app = self.app
- tm = app.tm
+ tm = self.app.tm
# If the given transaction ID is later than the last TID, the peer
# is crazy.
@@ -78,38 +71,7 @@ class StorageServiceHandler(BaseServiceH
raise ProtocolError('TID too big')
# transaction locked on this storage node
- if tm.lock(tid, uuid):
- self._afterLock(tid)
-
- def _afterLock(self, tid):
- # I have received all the lock answers now:
- # - send a Notify Transaction Finished to the initiated client node
- # - Invalidate Objects to the other client nodes
- app = self.app
- tm = app.tm
- t = tm[tid]
- ttid = t.getTTID()
- nm = app.nm
- transaction_node = t.getNode()
- invalidate_objects = Packets.InvalidateObjects(tid, t.getOIDList())
- answer_transaction_finished = Packets.AnswerTransactionFinished(ttid,
- tid)
- for client_node in nm.getClientList(only_identified=True):
- c = client_node.getConnection()
- if client_node is transaction_node:
- c.answer(answer_transaction_finished, msg_id=t.getMessageId())
- else:
- c.notify(invalidate_objects)
-
- # - Unlock Information to relevant storage nodes.
- notify_unlock = Packets.NotifyUnlockInformation(ttid)
- for storage_uuid in t.getUUIDList():
- nm.getByUUID(storage_uuid).getConnection().notify(notify_unlock)
-
- # remove transaction from manager
- tm.remove(tid)
- app.setLastTransaction(tid)
- app.executeQueuedEvent()
+ tm.lock(tid, conn.getUUID())
def notifyReplicationDone(self, conn, offset):
node = self.app.nm.getByUUID(conn.getUUID())
Modified: trunk/neo/master/transactions.py
==============================================================================
--- trunk/neo/master/transactions.py [iso-8859-1] (original)
+++ trunk/neo/master/transactions.py [iso-8859-1] Wed Dec 22 18:10:02 2010
@@ -193,17 +193,19 @@ class TransactionManager(object):
_next_ttid = 0
- def __init__(self):
+ def __init__(self, on_commit):
# tid -> transaction
self._tid_dict = {}
# node -> transactions mapping
self._node_dict = {}
self._last_oid = None
+ self._on_commit = on_commit
def __getitem__(self, tid):
"""
Return the transaction object for this TID
"""
+ # XXX: used by unit tests only
return self._tid_dict[tid]
def __contains__(self, tid):
@@ -213,6 +215,7 @@ class TransactionManager(object):
return tid in self._tid_dict
def items(self):
+ # XXX: used by unit tests only
return self._tid_dict.items()
def getNextOIDList(self, num_oids):
@@ -386,7 +389,19 @@ class TransactionManager(object):
Returns True if all are now locked
"""
assert tid in self._tid_dict, "Transaction not started"
- return self._tid_dict[tid].lock(uuid)
+ txn = self._tid_dict[tid]
+ if txn.lock(uuid):
+ # all storage are locked
+ self._on_commit(tid, txn)
+
+ def forget(self, uuid):
+ """
+ A storage node has been lost, don't expect a reply from it for
+ current transactions
+ """
+ for tid, txn in self._tid_dict.items():
+ if txn.forget(uuid):
+ self._on_commit(tid, txn)
def abortFor(self, node):
"""
Modified: trunk/neo/tests/master/testTransactions.py
==============================================================================
--- trunk/neo/tests/master/testTransactions.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testTransactions.py [iso-8859-1] Wed Dec 22 18:10:02 2010
@@ -61,7 +61,8 @@ class testTransactionManager(NeoUnitTest
uuid_list = (uuid1, uuid2) = self.makeUUID(1), self.makeUUID(2)
client_uuid = self.makeUUID(3)
# create transaction manager
- txnman = TransactionManager()
+ callback = Mock()
+ txnman = TransactionManager(on_commit=callback)
self.assertFalse(txnman.hasPending())
self.assertEqual(txnman.getPendingList(), [])
# begin the transaction
@@ -78,8 +79,10 @@ class testTransactionManager(NeoUnitTest
self.assertEqual(txn.getUUIDList(), list(uuid_list))
self.assertEqual(txn.getOIDList(), list(oid_list))
# lock nodes
- self.assertFalse(txnman.lock(tid, uuid1))
- self.assertTrue(txnman.lock(tid, uuid2))
+ txnman.lock(tid, uuid1)
+ self.assertEqual(len(callback.getNamedCalls('__call__')), 0)
+ txnman.lock(tid, uuid2)
+ self.assertEqual(len(callback.getNamedCalls('__call__')), 1)
# transaction finished
txnman.remove(tid)
self.assertEqual(txnman.getPendingList(), [])
@@ -91,7 +94,7 @@ class testTransactionManager(NeoUnitTest
storage_1_uuid = self.makeUUID(1)
storage_2_uuid = self.makeUUID(2)
client_uuid = self.makeUUID(3)
- txnman = TransactionManager()
+ txnman = TransactionManager(lambda tid, txn: None)
# register 4 transactions made by two nodes
self.assertEqual(txnman.getPendingList(), [])
ttid1 = txnman.begin(client_uuid)
@@ -108,7 +111,7 @@ class testTransactionManager(NeoUnitTest
txnman.begin(client_uuid, self.getNextTID())
def test_getNextOIDList(self):
- txnman = TransactionManager()
+ txnman = TransactionManager(lambda tid, txn: None)
# must raise as we don"t have one
self.assertEqual(txnman.getLastOID(), None)
self.assertRaises(RuntimeError, txnman.getNextOIDList, 1)
@@ -129,7 +132,7 @@ class testTransactionManager(NeoUnitTest
oid_list = [self.makeOID(1), ]
client_uuid = self.makeUUID(3)
- tm = TransactionManager()
+ tm = TransactionManager(lambda tid, txn: None)
# Transaction 1: 2 storage nodes involved, one will die and the other
# already answered node lock
msg_id_1 = 1
@@ -206,7 +209,7 @@ class testTransactionManager(NeoUnitTest
Note: this implementation might change later, to allow more paralelism.
"""
client_uuid = self.makeUUID(3)
- tm = TransactionManager()
+ tm = TransactionManager(lambda tid, txn: None)
# With a requested TID, lock spans from begin to remove
ttid1 = self.getNextTID()
ttid2 = self.getNextTID()
@@ -227,7 +230,7 @@ class testTransactionManager(NeoUnitTest
def testClientDisconectsAfterBegin(self):
client1_uuid = self.makeUUID(1)
client2_uuid = self.makeUUID(2)
- tm = TransactionManager()
+ tm = TransactionManager(lambda tid, txn: None)
tid1 = self.getNextTID()
tid2 = self.getNextTID()
tm.begin(client1_uuid, tid1)
More information about the Neo-report
mailing list