[Neo-report] r2613 gregory - in /trunk/neo: ./ master/ master/handlers/ storage/handlers/ ...
nobody at svn.erp5.org
nobody at svn.erp5.org
Tue Jan 11 16:49:16 CET 2011
Author: gregory
Date: Tue Jan 11 16:49:16 2011
New Revision: 2613
Log:
Master transaction manager use TTID as index.
- AnswerInformationLocked give ttid instead of tid
- Master transaction manager always use ttid in data structures
- It's no more makes sense to check if the tid is greater than the last
generated as it never comes back from a storage, just check if the ttid is
well known by the transaction manager.
- Rename all tid variable that now hold a ttid
- Transaction manager's queue contains ttids but the corresponding tids are
increasing to keep commit order.
- Adjust tests
Modified:
trunk/neo/handler.py
trunk/neo/master/app.py
trunk/neo/master/handlers/storage.py
trunk/neo/master/transactions.py
trunk/neo/storage/handlers/master.py
trunk/neo/tests/master/testClientHandler.py
trunk/neo/tests/master/testStorageHandler.py
trunk/neo/tests/master/testTransactions.py
Modified: trunk/neo/handler.py
==============================================================================
--- trunk/neo/handler.py [iso-8859-1] (original)
+++ trunk/neo/handler.py [iso-8859-1] Tue Jan 11 16:49:16 2011
@@ -220,7 +220,7 @@ class EventHandler(object):
def askLockInformation(self, conn, ttid, tid, oid_list):
raise UnexpectedPacketError
- def answerInformationLocked(self, conn, tid):
+ def answerInformationLocked(self, conn, ttid):
raise UnexpectedPacketError
def invalidateObjects(self, conn, tid, oid_list):
Modified: trunk/neo/master/app.py
==============================================================================
--- trunk/neo/master/app.py [iso-8859-1] (original)
+++ trunk/neo/master/app.py [iso-8859-1] Tue Jan 11 16:49:16 2011
@@ -560,11 +560,12 @@ class Application(object):
neo.logging.info('Accept a storage %s (%s)' % (dump(uuid), state))
return (uuid, node, state, handler, node_ctor)
- def onTransactionCommitted(self, tid, txn):
+ def onTransactionCommitted(self, txn):
# I have received all the lock answers now:
# - send a Notify Transaction Finished to the initiated client node
# - Invalidate Objects to the other client nodes
ttid = txn.getTTID()
+ tid = txn.getTID()
transaction_node = txn.getNode()
invalidate_objects = Packets.InvalidateObjects(tid, txn.getOIDList())
transaction_finished = Packets.AnswerTransactionFinished(ttid, tid)
@@ -582,7 +583,7 @@ class Application(object):
getByUUID(storage_uuid).getConnection().notify(notify_unlock)
# remove transaction from manager
- self.tm.remove(transaction_node.getUUID(), tid)
+ self.tm.remove(transaction_node.getUUID(), ttid)
self.setLastTransaction(tid)
def getLastTransaction(self):
Modified: trunk/neo/master/handlers/storage.py
==============================================================================
--- trunk/neo/master/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/storage.py [iso-8859-1] Tue Jan 11 16:49:16 2011
@@ -62,16 +62,12 @@ class StorageServiceHandler(BaseServiceH
p = Packets.AnswerUnfinishedTransactions(self.app.tm.getPendingList())
conn.answer(p)
- def answerInformationLocked(self, conn, tid):
+ def answerInformationLocked(self, conn, ttid):
tm = self.app.tm
-
- # If the given transaction ID is later than the last TID, the peer
- # is crazy.
- if tid > tm.getLastTID():
- raise ProtocolError('TID too big')
-
+ if ttid not in tm:
+ raise ProtocolError('Unknown transaction')
# transaction locked on this storage node
- tm.lock(tid, conn.getUUID())
+ self.app.tm.lock(ttid, conn.getUUID())
def notifyReplicationDone(self, conn, offset):
node = self.app.nm.getByUUID(conn.getUUID())
Modified: trunk/neo/master/transactions.py
==============================================================================
--- trunk/neo/master/transactions.py [iso-8859-1] (original)
+++ trunk/neo/master/transactions.py [iso-8859-1] Tue Jan 11 16:49:16 2011
@@ -190,26 +190,27 @@ class TransactionManager(object):
_next_ttid = 0
def __init__(self, on_commit):
- # tid -> transaction
- self._tid_dict = {}
+ # ttid -> transaction
+ self._ttid_dict = {}
# node -> transactions mapping
self._node_dict = {}
self._last_oid = None
self._on_commit = on_commit
+ # queue filled with ttids pointing to transactions with increasing tids
self._queue = []
- def __getitem__(self, tid):
+ def __getitem__(self, ttid):
"""
Return the transaction object for this TID
"""
# XXX: used by unit tests only
- return self._tid_dict[tid]
+ return self._ttid_dict[ttid]
- def __contains__(self, tid):
+ def __contains__(self, ttid):
"""
Returns True if this is a pending transaction
"""
- return tid in self._tid_dict
+ return ttid in self._ttid_dict
def getNextOIDList(self, num_oids):
""" Generate a new OID list """
@@ -306,20 +307,20 @@ class TransactionManager(object):
Discard all manager content
This doesn't reset the last TID.
"""
- self._tid_dict = {}
+ self._ttid_dict = {}
self._node_dict = {}
def hasPending(self):
"""
Returns True if some transactions are pending
"""
- return bool(self._tid_dict)
+ return bool(self._ttid_dict)
def getPendingList(self):
"""
Return the list of pending transaction IDs
"""
- return self._tid_dict.keys()
+ return [txn.getTID() for txn in self._ttid_dict.values()]
def begin(self, uuid, tid=None):
"""
@@ -345,38 +346,41 @@ class TransactionManager(object):
break
else:
tid = self._nextTID(ttid, divisor)
- self._queue.append((node.getUUID(), tid))
+ self._queue.append((node.getUUID(), ttid))
neo.logging.debug('Finish TXN %s for %s (was %s)', dump(tid), node, dump(ttid))
txn = Transaction(node, ttid, tid, oid_list, uuid_list, msg_id)
- self._tid_dict[tid] = txn
- self._node_dict.setdefault(node, {})[tid] = txn
+ self._ttid_dict[ttid] = txn
+ self._node_dict.setdefault(node, {})[ttid] = txn
return tid
- def remove(self, uuid, tid):
+ def remove(self, uuid, ttid):
"""
Remove a transaction, commited or aborted
"""
try:
- self._queue.remove((uuid, tid))
+ # only in case of an import:
+ self._queue.remove((uuid, ttid))
except ValueError:
# finish might not have been started
pass
- tid_dict = self._tid_dict
- if tid in tid_dict:
+ ttid_dict = self._ttid_dict
+ if ttid in ttid_dict:
+ txn = ttid_dict[ttid]
+ tid = txn.getTID()
+ node = txn.getNode()
# ...and tried to finish
- node = tid_dict[tid].getNode()
- del tid_dict[tid]
- del self._node_dict[node][tid]
+ del ttid_dict[ttid]
+ del self._node_dict[node][ttid]
- def lock(self, tid, uuid):
+ def lock(self, ttid, uuid):
"""
Set that a node has locked the transaction.
If transaction is completely locked, calls function given at
instanciation time.
"""
- assert tid in self._tid_dict, "Transaction not started"
- txn = self._tid_dict[tid]
- if txn.lock(uuid) and self._queue[0][1] == tid:
+ assert ttid in self._ttid_dict, "Transaction not started"
+ txn = self._ttid_dict[ttid]
+ if txn.lock(uuid) and self._queue[0][1] == ttid:
# all storage are locked and we unlock the commit queue
self._unlockPending()
@@ -387,8 +391,8 @@ class TransactionManager(object):
"""
unlock = False
# iterate over a copy because _unlockPending may alter the dict
- for tid, txn in self._tid_dict.items():
- if txn.forget(uuid) and self._queue[0][1] == tid:
+ for ttid, txn in self._ttid_dict.items():
+ if txn.forget(uuid) and self._queue[0][1] == ttid:
unlock = True
if unlock:
self._unlockPending()
@@ -399,15 +403,15 @@ class TransactionManager(object):
pop = queue.pop
insert = queue.insert
on_commit = self._on_commit
- get = self._tid_dict.get
+ get = self._ttid_dict.get
while queue:
- uuid, tid = pop(0)
- txn = get(tid, None)
+ uuid, ttid = pop(0)
+ txn = get(ttid, None)
# _queue can contain un-prepared transactions
if txn is not None and txn.locked():
- on_commit(tid, txn)
+ on_commit(txn)
else:
- insert(0, (uuid, tid))
+ insert(0, (uuid, ttid))
break
def abortFor(self, node):
@@ -423,13 +427,13 @@ class TransactionManager(object):
if node in self._node_dict:
# remove transactions
remove = self.remove
- for tid in self._node_dict[node].keys():
- remove(uuid, tid)
+ for ttid in self._node_dict[node].keys():
+ remove(uuid, ttid)
# discard node entry
del self._node_dict[node]
def log(self):
neo.logging.info('Transactions:')
- for txn in self._tid_dict.itervalues():
+ for txn in self._ttid_dict.itervalues():
neo.logging.info(' %r', txn)
Modified: trunk/neo/storage/handlers/master.py
==============================================================================
--- trunk/neo/storage/handlers/master.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/master.py [iso-8859-1] Tue Jan 11 16:49:16 2011
@@ -58,7 +58,7 @@ class MasterOperationHandler(BaseMasterH
raise ProtocolError('Unknown transaction')
self.app.tm.lock(ttid, tid, oid_list)
if not conn.isClosed():
- conn.answer(Packets.AnswerInformationLocked(tid))
+ conn.answer(Packets.AnswerInformationLocked(ttid))
def notifyUnlockInformation(self, conn, ttid):
if not ttid in self.app.tm:
Modified: trunk/neo/tests/master/testClientHandler.py
==============================================================================
--- trunk/neo/tests/master/testClientHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testClientHandler.py [iso-8859-1] Tue Jan 11 16:49:16 2011
@@ -127,24 +127,24 @@ class MasterClientHandlerTests(NeoUnitTe
],
'getPartitions': 2,
})
- service.askBeginTransaction(conn, None)
+ ttid = self.getNextTID()
+ service.askBeginTransaction(conn, ttid)
oid_list = []
- tid = self.app.tm.getLastTID()
conn = self.getFakeConnection(client_uuid, self.client_address)
self.app.nm.getByUUID(storage_uuid).setConnection(storage_conn)
# No packet sent if storage node is not ready
self.assertFalse(self.app.isStorageReady(storage_uuid))
- service.askFinishTransaction(conn, tid, oid_list)
+ service.askFinishTransaction(conn, ttid, oid_list)
self.checkNoPacketSent(storage_conn)
self.app.tm.abortFor(self.app.nm.getByUUID(client_uuid))
# ...but AskLockInformation is sent if it is ready
self.app.setStorageReady(storage_uuid)
self.assertTrue(self.app.isStorageReady(storage_uuid))
- service.askFinishTransaction(conn, tid, oid_list)
+ service.askFinishTransaction(conn, ttid, oid_list)
self.checkAskLockInformation(storage_conn)
self.assertEquals(len(self.app.tm.getPendingList()), 1)
- apptid = self.app.tm.getPendingList()[0]
- txn = self.app.tm[apptid]
+ txn = self.app.tm[ttid]
+ self.assertEquals(txn.getTID(), self.app.tm.getPendingList()[0])
self.assertEquals(len(txn.getOIDList()), 0)
self.assertEquals(len(txn.getUUIDList()), 1)
Modified: trunk/neo/tests/master/testStorageHandler.py
==============================================================================
--- trunk/neo/tests/master/testStorageHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testStorageHandler.py [iso-8859-1] Tue Jan 11 16:49:16 2011
@@ -104,15 +104,15 @@ class MasterStorageHandlerTests(NeoUnitT
ttid = self.app.tm.begin(client_1.getUUID())
tid = self.app.tm.prepare(client_1, ttid, 1, oid_list, uuid_list,
msg_id)
- self.assertTrue(tid in self.app.tm)
+ self.assertTrue(ttid in self.app.tm)
# the first storage acknowledge the lock
- self.service.answerInformationLocked(storage_conn_1, tid)
+ self.service.answerInformationLocked(storage_conn_1, ttid)
self.checkNoPacketSent(client_conn_1)
self.checkNoPacketSent(client_conn_2)
self.checkNoPacketSent(storage_conn_1)
self.checkNoPacketSent(storage_conn_2)
# then the second
- self.service.answerInformationLocked(storage_conn_2, tid)
+ self.service.answerInformationLocked(storage_conn_2, ttid)
self.checkAnswerTransactionFinished(client_conn_1)
self.checkInvalidateObjects(client_conn_2)
self.checkNotifyUnlockInformation(storage_conn_1)
@@ -211,7 +211,7 @@ class MasterStorageHandlerTests(NeoUnitT
ttid1 = tm.begin(node1.getUUID())
tid1 = tm.prepare(client1, ttid1, 1, oid_list,
[node1.getUUID(), node2.getUUID()], msg_id_1)
- tm.lock(tid1, node2.getUUID())
+ tm.lock(ttid1, node2.getUUID())
self.checkNoPacketSent(cconn1)
# Storage 1 dies
node1.setTemporarilyDown()
@@ -231,7 +231,7 @@ class MasterStorageHandlerTests(NeoUnitT
[node1.getUUID(), node2.getUUID()], msg_id_2)
# T2: pending locking answer, client keeps waiting
self.checkNoPacketSent(cconn2, check_notify=False)
- tm.remove(node1.getUUID(), tid2)
+ tm.remove(node1.getUUID(), ttid2)
# Transaction 3: 1 storage node involved, which won't die
msg_id_3 = 3
@@ -240,7 +240,7 @@ class MasterStorageHandlerTests(NeoUnitT
[node2.getUUID(), ], msg_id_3)
# T3: action not significant to this transacion, so no response
self.checkNoPacketSent(cconn3, check_notify=False)
- tm.remove(node1.getUUID(), tid3)
+ tm.remove(node1.getUUID(), ttid3)
def test_answerPack(self):
# Note: incomming status has no meaning here, so it's left to False.
Modified: trunk/neo/tests/master/testTransactions.py
==============================================================================
--- trunk/neo/tests/master/testTransactions.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testTransactions.py [iso-8859-1] Tue Jan 11 16:49:16 2011
@@ -79,17 +79,17 @@ class testTransactionManager(NeoUnitTest
tid = txnman.prepare(node, ttid, 1, oid_list, uuid_list, msg_id)
self.assertTrue(txnman.hasPending())
self.assertEqual(txnman.getPendingList()[0], tid)
- self.assertEqual(txnman[tid].getTID(), tid)
- txn = txnman[tid]
+ txn = txnman[ttid]
+ self.assertEqual(txn.getTID(), tid)
self.assertEqual(txn.getUUIDList(), list(uuid_list))
self.assertEqual(txn.getOIDList(), list(oid_list))
# lock nodes
- txnman.lock(tid, uuid1)
+ txnman.lock(ttid, uuid1)
self.assertEqual(len(callback.getNamedCalls('__call__')), 0)
- txnman.lock(tid, uuid2)
+ txnman.lock(ttid, uuid2)
self.assertEqual(len(callback.getNamedCalls('__call__')), 1)
# transaction finished
- txnman.remove(client_uuid, tid)
+ txnman.remove(client_uuid, ttid)
self.assertEqual(txnman.getPendingList(), [])
def testAbortFor(self):
@@ -144,8 +144,8 @@ class testTransactionManager(NeoUnitTest
ttid1 = tm.begin(client_uuid)
tid1 = tm.prepare(client1, ttid1, 1, oid_list,
[storage_1_uuid, storage_2_uuid], msg_id_1)
- tm.lock(tid1, storage_2_uuid)
- t1 = tm[tid1]
+ tm.lock(ttid1, storage_2_uuid)
+ t1 = tm[ttid1]
self.assertFalse(t1.locked())
# Storage 1 dies:
# t1 is over
@@ -158,7 +158,7 @@ class testTransactionManager(NeoUnitTest
ttid2 = tm.begin(client_uuid)
tid2 = tm.prepare(client2, ttid2, 1, oid_list,
[storage_1_uuid, storage_2_uuid], msg_id_2)
- t2 = tm[tid2]
+ t2 = tm[ttid2]
self.assertFalse(t2.locked())
# Storage 1 dies:
# t2 still waits for storage 2
@@ -172,7 +172,7 @@ class testTransactionManager(NeoUnitTest
ttid3 = tm.begin(client_uuid)
tid3 = tm.prepare(client3, ttid3, 1, oid_list, [storage_2_uuid, ],
msg_id_3)
- t3 = tm[tid3]
+ t3 = tm[ttid3]
self.assertFalse(t3.locked())
# Storage 1 dies:
# t3 doesn't care
@@ -249,10 +249,10 @@ class testTransactionManager(NeoUnitTest
ttid2 = tm.begin(uuid2)
tid1 = tm.prepare(node1, ttid1, 1, [], [storage_uuid], 0)
tid2 = tm.prepare(node2, ttid2, 1, [], [storage_uuid], 0)
- tm.lock(tid2, storage_uuid)
+ tm.lock(ttid2, storage_uuid)
# txn 2 is still blocked by txn 1
self.assertEqual(len(callback.getNamedCalls('__call__')), 0)
- tm.lock(tid1, storage_uuid)
+ tm.lock(ttid1, storage_uuid)
# both transactions are unlocked when txn 1 is fully locked
self.assertEqual(len(callback.getNamedCalls('__call__')), 2)
More information about the Neo-report
mailing list