[Neo-report] r2564 gregory - in /trunk/neo: master/ master/handlers/ tests/master/
nobody at svn.erp5.org
nobody at svn.erp5.org
Wed Dec 22 18:10:15 CET 2010
Author: gregory
Date: Wed Dec 22 18:10:14 2010
New Revision: 2564
Log:
Serialize only the last part of the 2PC.
Modified:
trunk/neo/master/app.py
trunk/neo/master/handlers/client.py
trunk/neo/master/transactions.py
trunk/neo/tests/master/testClientHandler.py
trunk/neo/tests/master/testStorageHandler.py
trunk/neo/tests/master/testTransactions.py
Modified: trunk/neo/master/app.py
==============================================================================
--- trunk/neo/master/app.py [iso-8859-1] (original)
+++ trunk/neo/master/app.py [iso-8859-1] Wed Dec 22 18:10:14 2010
@@ -45,8 +45,6 @@ class Application(object):
last_transaction = ZERO_TID
def __init__(self, config):
- self._queued_events = []
-
# always use default connector for now
self.connector_handler = getConnectorHandler()
@@ -584,9 +582,8 @@ class Application(object):
getByUUID(storage_uuid).getConnection().notify(notify_unlock)
# remove transaction from manager
- self.tm.remove(tid)
+ self.tm.remove(transaction_node.getUUID(), tid)
self.setLastTransaction(tid)
- self.executeQueuedEvent()
def getLastTransaction(self):
return self.last_transaction
@@ -605,17 +602,3 @@ class Application(object):
def isStorageReady(self, uuid):
return uuid in self.storage_readiness
- def queueEvent(self, func, conn, *args, **kw):
- msg_id = conn.getPeerId()
- self._queued_events.append((func, msg_id, conn, args, kw))
-
- def executeQueuedEvent(self):
- queue = self._queued_events
- while queue:
- func, msg_id, conn, args, kw = queue.pop(0)
- if conn.isAborted() or conn.isClosed():
- continue
- conn.setPeerId(msg_id)
- func(conn, *args, **kw)
- break
-
Modified: trunk/neo/master/handlers/client.py
==============================================================================
--- trunk/neo/master/handlers/client.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/client.py [iso-8859-1] Wed Dec 22 18:10:14 2010
@@ -51,11 +51,8 @@ class ClientServiceHandler(MasterHandler
"""
A client request a TID, nothing is kept about it until the finish.
"""
- try:
- conn.answer(Packets.AnswerBeginTransaction(self.app.tm.begin(
- conn.getUUID(), tid)))
- except DelayedError:
- self.app.queueEvent(self.askBeginTransaction, conn, tid)
+ conn.answer(Packets.AnswerBeginTransaction(self.app.tm.begin(
+ conn.getUUID(), tid)))
def askNewOIDs(self, conn, num_oids):
app = self.app
@@ -88,13 +85,8 @@ class ClientServiceHandler(MasterHandler
partitions = app.pt.getPartitions()
peer_id = conn.getPeerId()
node = app.nm.getByUUID(conn.getUUID())
- try:
- tid = app.tm.prepare(node, ttid, partitions, oid_list,
- usable_uuid_set, peer_id)
- except DelayedError:
- app.queueEvent(self.askFinishTransaction, conn, ttid,
- oid_list)
- return
+ tid = app.tm.prepare(node, ttid, partitions, oid_list,
+ usable_uuid_set, peer_id)
# check if greater and foreign OID was stored
if app.tm.updateLastOID(oid_list):
@@ -124,7 +116,5 @@ class ClientServiceHandler(MasterHandler
self.app.getLastTransaction()))
def abortTransaction(self, conn, tid):
- app = self.app
- app.tm.remove(tid)
- app.executeQueuedEvent()
+ self.app.tm.remove(conn.getUUID(), tid)
Modified: trunk/neo/master/transactions.py
==============================================================================
--- trunk/neo/master/transactions.py [iso-8859-1] (original)
+++ trunk/neo/master/transactions.py [iso-8859-1] Wed Dec 22 18:10:14 2010
@@ -187,10 +187,6 @@ class TransactionManager(object):
Manage current transactions
"""
_last_tid = ZERO_TID
- # Transaction serialisation
- # We don't need to use a real lock, as we are mono-threaded.
- _locked = None
-
_next_ttid = 0
def __init__(self, on_commit):
@@ -200,6 +196,7 @@ class TransactionManager(object):
self._node_dict = {}
self._last_oid = None
self._on_commit = on_commit
+ self._queue = []
def __getitem__(self, tid):
"""
@@ -336,46 +333,36 @@ class TransactionManager(object):
# No TID requested, generate a temporary one
tid = self.getTTID()
else:
- # TID requested, take commit lock immediately
- if self._locked is not None:
- raise DelayedError()
- self._locked = (uuid, tid)
+ self._queue.append((uuid, tid))
return tid
def prepare(self, node, ttid, divisor, oid_list, uuid_list, msg_id):
"""
Prepare a transaction to be finished
"""
- locked = self._locked
- uuid = node.getUUID()
- if locked is not None and locked[1] == ttid:
- assert locked[0] == uuid
- # Transaction requested some TID upon begin, and it owns the commit
- # lock since then.
- tid = ttid
+ # XXX: not efficient but the list should be often small
+ for _, tid in self._queue:
+ if ttid == tid:
+ break
else:
- # Otherwise, acquire lock and allocate a new TID.
- if locked is not None:
- raise DelayedError()
tid = self._nextTID(ttid, divisor)
- self._locked = (uuid, tid)
-
+ self._queue.append((node.getUUID(), tid))
self.setLastTID(tid)
+ neo.logging.debug('Finish TXN %s for %s (was %s)', dump(tid), node, dump(ttid))
txn = Transaction(node, ttid, tid, oid_list, uuid_list, msg_id)
self._tid_dict[tid] = txn
self._node_dict.setdefault(node, {})[tid] = txn
return tid
- def remove(self, tid):
+ def remove(self, uuid, tid):
"""
Remove a transaction, commited or aborted
"""
- locked = self._locked
- if locked is not None and tid == locked[1]:
- # If TID has the lock, release it.
- # It might legitimately not have the lock (ex: a transaction
- # aborting, which didn't request a TID upon begin)
- self._locked = None
+ try:
+ self._queue.remove((uuid, tid))
+ except ValueError:
+ # finish might not have been started
+ pass
tid_dict = self._tid_dict
if tid in tid_dict:
# ...and tried to finish
@@ -392,7 +379,7 @@ class TransactionManager(object):
txn = self._tid_dict[tid]
if txn.lock(uuid):
# all storage are locked
- self._on_commit(tid, txn)
+ self._unlockPending()
def forget(self, uuid):
"""
@@ -401,22 +388,37 @@ class TransactionManager(object):
"""
for tid, txn in self._tid_dict.items():
if txn.forget(uuid):
+ self._unlockPending()
+
+ def _unlockPending(self):
+ # unlock pending transactions
+ while self._queue:
+ tid = self._queue[0][1]
+ # _queue can contain un-prepared transactions
+ txn = self._tid_dict.get(tid, None)
+ if txn is not None and txn.locked():
+ self._queue.pop()
self._on_commit(tid, txn)
+ else:
+ break
def abortFor(self, node):
"""
Abort pending transactions initiated by a node
"""
- locked = self._locked
- if locked is not None and locked[0] == node.getUUID():
- self._locked = None
+ neo.logging.debug('Abort for %s', node)
# nothing to do
if node not in self._node_dict:
return
# remove transactions
+ uuid = node.getUUID()
remove = self.remove
for tid in self._node_dict[node].keys():
- remove(tid)
+ remove(uuid, tid)
+ # the code below is usefull only during an import
+ for nuuid, ntid in list(self._queue):
+ if nuuid == uuid:
+ self._queue.remove((uuid, tid))
# discard node entry
del self._node_dict[node]
Modified: trunk/neo/tests/master/testClientHandler.py
==============================================================================
--- trunk/neo/tests/master/testClientHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testClientHandler.py [iso-8859-1] Wed Dec 22 18:10:14 2010
@@ -82,16 +82,6 @@ class MasterClientHandlerTests(NeoUnitTe
# Client asks for a TID
self.app.tm = tm_org
service.askBeginTransaction(conn, tid1)
- # If asking again for a TID, call is queued
- call_marker = []
- def queueEvent(*args, **kw):
- call_marker.append((args, kw))
- self.app.queueEvent = queueEvent
- service.askBeginTransaction(conn, tid2)
- self.assertEqual(len(call_marker), 1)
- args, kw = call_marker[0]
- self.assertEqual(kw, {})
- self.assertEqual(args, (service.askBeginTransaction, conn, tid2))
def test_08_askNewOIDs(self):
service = self.service
Modified: trunk/neo/tests/master/testStorageHandler.py
==============================================================================
--- trunk/neo/tests/master/testStorageHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testStorageHandler.py [iso-8859-1] Wed Dec 22 18:10:14 2010
@@ -231,7 +231,7 @@ class MasterStorageHandlerTests(NeoUnitT
[node1.getUUID(), node2.getUUID()], msg_id_2)
# T2: pending locking answer, client keeps waiting
self.checkNoPacketSent(cconn2, check_notify=False)
- tm.remove(tid2)
+ tm.remove(node1.getUUID(), tid2)
# Transaction 3: 1 storage node involved, which won't die
msg_id_3 = 3
@@ -240,7 +240,7 @@ class MasterStorageHandlerTests(NeoUnitT
[node2.getUUID(), ], msg_id_3)
# T3: action not significant to this transacion, so no response
self.checkNoPacketSent(cconn3, check_notify=False)
- tm.remove(tid3)
+ tm.remove(node1.getUUID(), tid3)
def test_answerPack(self):
# Note: incomming status has no meaning here, so it's left to False.
Modified: trunk/neo/tests/master/testTransactions.py
==============================================================================
--- trunk/neo/tests/master/testTransactions.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testTransactions.py [iso-8859-1] Wed Dec 22 18:10:14 2010
@@ -84,7 +84,7 @@ class testTransactionManager(NeoUnitTest
txnman.lock(tid, uuid2)
self.assertEqual(len(callback.getNamedCalls('__call__')), 1)
# transaction finished
- txnman.remove(tid)
+ txnman.remove(client_uuid, tid)
self.assertEqual(txnman.getPendingList(), [])
def testAbortFor(self):
@@ -108,7 +108,7 @@ class testTransactionManager(NeoUnitTest
self.assertEqual(txnman.getPendingList(), [])
self.assertFalse(txnman.hasPending())
# ...and the lock is available
- txnman.begin(client_uuid, self.getNextTID())
+ txnman.begin(self.getNextTID())
def test_getNextOIDList(self):
txnman = TransactionManager(lambda tid, txn: None)
@@ -146,7 +146,7 @@ class testTransactionManager(NeoUnitTest
# t1 is over
self.assertTrue(t1.forget(storage_1_uuid))
self.assertEqual(t1.getUUIDList(), [storage_2_uuid])
- tm.remove(tid1)
+ tm.remove(client_uuid, tid1)
# Transaction 2: 2 storage nodes involved, one will die
msg_id_2 = 2
@@ -160,7 +160,7 @@ class testTransactionManager(NeoUnitTest
self.assertFalse(t2.forget(storage_1_uuid))
self.assertEqual(t2.getUUIDList(), [storage_2_uuid])
self.assertTrue(t2.lock(storage_2_uuid))
- tm.remove(tid2)
+ tm.remove(client_uuid, tid2)
# Transaction 3: 1 storage node involved, which won't die
msg_id_3 = 3
@@ -174,7 +174,7 @@ class testTransactionManager(NeoUnitTest
self.assertFalse(t3.forget(storage_1_uuid))
self.assertEqual(t3.getUUIDList(), [storage_2_uuid])
self.assertTrue(t3.lock(storage_2_uuid))
- tm.remove(tid3)
+ tm.remove(client_uuid, tid3)
def testTIDUtils(self):
"""
@@ -215,16 +215,13 @@ class testTransactionManager(NeoUnitTest
ttid2 = self.getNextTID()
tid1 = tm.begin(client_uuid, ttid1)
self.assertEqual(tid1, ttid1)
- self.assertRaises(DelayedError, tm.begin, client_uuid, ttid2)
- tm.remove(tid1)
- tm.remove(tm.begin(client_uuid, ttid2))
+ tm.remove(client_uuid, tid1)
# Without a requested TID, lock spans from prepare to remove only
ttid3 = tm.begin(client_uuid)
ttid4 = tm.begin(client_uuid) # Doesn't raise
node = Mock({'getUUID': client_uuid, '__hash__': 0})
tid4 = tm.prepare(node, ttid4, 1, [], [], 0)
- self.assertRaises(DelayedError, tm.prepare, node, ttid3, 1, [], [], 0)
- tm.remove(tid4)
+ tm.remove(client_uuid, tid4)
tm.prepare(node, ttid3, 1, [], [], 0)
def testClientDisconectsAfterBegin(self):
@@ -233,11 +230,10 @@ class testTransactionManager(NeoUnitTest
tm = TransactionManager(lambda tid, txn: None)
tid1 = self.getNextTID()
tid2 = self.getNextTID()
- tm.begin(client1_uuid, tid1)
- self.assertRaises(DelayedError, tm.begin, client2_uuid, tid2)
+ tm.begin(tid1)
node1 = Mock({'getUUID': client1_uuid, '__hash__': 0})
tm.abortFor(node1)
- tm.begin(client2_uuid, tid2)
+ tm.begin(tid2)
if __name__ == '__main__':
unittest.main()
More information about the Neo-report
mailing list