[Neo-report] r2237 gregory - in /trunk/neo: ./ client/ master/ master/handlers/ tests/ tes...
nobody at svn.erp5.org
nobody at svn.erp5.org
Fri Aug 27 17:47:00 CEST 2010
Author: gregory
Date: Fri Aug 27 17:46:58 2010
New Revision: 2237
Log:
Request a TID only if not supplied by the ZODB.
Master transactions objects are instanciated during the finish phase only,
which means that any transaction known by the master is being committed.
Modified:
trunk/neo/client/app.py
trunk/neo/handler.py
trunk/neo/master/handlers/client.py
trunk/neo/master/handlers/storage.py
trunk/neo/master/transactions.py
trunk/neo/protocol.py
trunk/neo/tests/client/testClientApp.py
trunk/neo/tests/master/testClientHandler.py
trunk/neo/tests/master/testStorageHandler.py
trunk/neo/tests/master/testTransactions.py
trunk/neo/tests/testConnection.py
trunk/neo/tests/testProtocol.py
Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Fri Aug 27 17:46:58 2010
@@ -564,16 +564,14 @@ class Application(object):
return
if self.local_var.txn is not None:
raise NeoException, 'local_var is not clean in tpc_begin'
- # ask the primary master to start a transaction, if no tid is supplied,
- # the master will supply us one. Otherwise the requested tid will be
- # used if possible.
- self.local_var.tid = None
- self._askPrimary(Packets.AskBeginTransaction(tid))
- if self.local_var.tid is None:
- raise NEOStorageError('tpc_begin failed')
+ # use the given TID or request a new one to the master
+ self.local_var.tid = tid
+ if tid is None:
+ self._askPrimary(Packets.AskBeginTransaction())
+ if self.local_var.tid is None:
+ raise NEOStorageError('tpc_begin failed')
self.local_var.txn = transaction
-
@profiler_decorator
def store(self, oid, serial, data, version, transaction):
"""Store object."""
Modified: trunk/neo/handler.py
==============================================================================
--- trunk/neo/handler.py [iso-8859-1] (original)
+++ trunk/neo/handler.py [iso-8859-1] Fri Aug 27 17:46:58 2010
@@ -196,7 +196,7 @@ class EventHandler(object):
def commitTransaction(self, conn, tid):
raise UnexpectedPacketError
- def askBeginTransaction(self, conn, tid):
+ def askBeginTransaction(self, conn):
raise UnexpectedPacketError
def answerBeginTransaction(self, conn, tid):
Modified: trunk/neo/master/handlers/client.py
==============================================================================
--- trunk/neo/master/handlers/client.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/client.py [iso-8859-1] Fri Aug 27 17:46:58 2010
@@ -48,12 +48,14 @@ class ClientServiceHandler(MasterHandler
conn.answer(Packets.AnswerNodeInformation())
def abortTransaction(self, conn, tid):
- self.app.tm.remove(tid)
+ # nothing to remove.
+ pass
- def askBeginTransaction(self, conn, tid):
- node = self.app.nm.getByUUID(conn.getUUID())
- tid = self.app.tm.begin(node, tid)
- conn.answer(Packets.AnswerBeginTransaction(tid))
+ def askBeginTransaction(self, conn):
+ """
+ A client request a TID, nothing is kept about it until the finish.
+ """
+ conn.answer(Packets.AnswerBeginTransaction(self.app.tm.begin()))
def askNewOIDs(self, conn, num_oids):
conn.answer(Packets.AnswerNewOIDs(self.app.tm.getNextOIDList(num_oids)))
@@ -61,10 +63,6 @@ class ClientServiceHandler(MasterHandler
def askFinishTransaction(self, conn, tid, oid_list):
app = self.app
- # If the given transaction ID is later than the last TID, the peer
- # is crazy.
- if tid > self.app.tm.getLastTID():
- raise ProtocolError('TID too big')
# Collect partitions related to this transaction.
getPartition = app.pt.getPartition
@@ -91,5 +89,6 @@ class ClientServiceHandler(MasterHandler
node.ask(p, timeout=60)
used_uuid_set.add(node.getUUID())
- app.tm.prepare(tid, oid_list, used_uuid_set, conn.getPeerId())
+ node = self.app.nm.getByUUID(conn.getUUID())
+ app.tm.prepare(node, tid, oid_list, used_uuid_set, conn.getPeerId())
Modified: trunk/neo/master/handlers/storage.py
==============================================================================
--- trunk/neo/master/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/storage.py [iso-8859-1] Fri Aug 27 17:46:58 2010
@@ -43,11 +43,8 @@ class StorageServiceHandler(BaseServiceH
self.app.outdateAndBroadcastPartition()
uuid = conn.getUUID()
for tid, transaction in self.app.tm.items():
- # If this transaction was not "prepared" (see askFinishTransaction)
- # there is nothing to cleanup on it (it doesn't have the list of
- # involved storage nodes yet). As such transaction would be detected
- # as locked, we must also prevent _afterLock from being called.
- if transaction.isPrepared() and transaction.forget(uuid):
+ # if a transaction is known, this means that it's being committed
+ if transaction.forget(uuid):
self._afterLock(tid)
def askLastIDs(self, conn):
Modified: trunk/neo/master/transactions.py
==============================================================================
--- trunk/neo/master/transactions.py [iso-8859-1] (original)
+++ trunk/neo/master/transactions.py [iso-8859-1] Fri Aug 27 17:46:58 2010
@@ -26,15 +26,16 @@ class Transaction(object):
A pending transaction
"""
- _prepared = False
-
- def __init__(self, node, tid):
+ def __init__(self, node, tid, oid_list, uuid_list, msg_id):
+ """
+ Prepare the transaction, set OIDs and UUIDs related to it
+ """
self._node = node
self._tid = tid
- self._oid_list = []
- self._msg_id = None
+ self._oid_list = oid_list
+ self._msg_id = msg_id
# uuid dict hold flag to known who has locked the transaction
- self._uuid_dict = {}
+ self._uuid_dict = dict.fromkeys(uuid_list, False)
self._birth = time()
def __repr__(self):
@@ -60,12 +61,6 @@ class Transaction(object):
"""
return self._tid
- def isPrepared(self):
- """
-
- """
- return self._prepared
-
def getMessageId(self):
"""
Returns the packet ID to use in the answer
@@ -85,17 +80,6 @@ class Transaction(object):
return list(self._oid_list)
- def prepare(self, oid_list, uuid_list, msg_id):
- """
- Prepare the transaction, set OIDs and UUIDs related to it
- """
- assert not self._oid_list
- assert not self._uuid_dict
- self._oid_list = oid_list
- self._uuid_dict = dict.fromkeys(uuid_list, False)
- self._msg_id = msg_id
- self._prepared = True
-
def forget(self, uuid):
"""
Given storage was lost while waiting for its lock, stop waiting
@@ -239,38 +223,25 @@ class TransactionManager(object):
"""
return self._tid_dict.keys()
- def begin(self, node, tid):
+ def begin(self):
"""
- Begin a new transaction
+ Generate a new TID
"""
- assert node is not None
- if tid is not None and tid < self._last_tid:
- logging.warn('Transaction began with a decreased TID: %s, ' \
- 'expected at least %s', tid, self._last_tid)
- if tid is None:
- # give a TID
- tid = self._nextTID()
- txn = Transaction(node, tid)
- self._tid_dict[tid] = txn
- self._node_dict.setdefault(node, {})[tid] = txn
- self.setLastTID(tid)
- return tid
+ return self._nextTID()
- def prepare(self, tid, oid_list, uuid_list, msg_id):
+ def prepare(self, node, tid, oid_list, uuid_list, msg_id):
"""
Prepare a transaction to be finished
"""
- assert tid in self._tid_dict, "Transaction not started"
- txn = self._tid_dict[tid]
- txn.prepare(oid_list, uuid_list, msg_id)
+ self.setLastTID(tid)
+ txn = Transaction(node, tid, oid_list, uuid_list, msg_id)
+ self._tid_dict[tid] = txn
+ self._node_dict.setdefault(node, {})[tid] = txn
def remove(self, tid):
"""
Remove a transaction, commited or aborted
"""
- if tid not in self._tid_dict:
- logging.warn('aborting transaction %s does not exist', dump(tid))
- return
node = self._tid_dict[tid].getNode()
# remove both mappings, node will be removed in abortFor
del self._tid_dict[tid]
Modified: trunk/neo/protocol.py
==============================================================================
--- trunk/neo/protocol.py [iso-8859-1] (original)
+++ trunk/neo/protocol.py [iso-8859-1] Fri Aug 27 17:46:58 2010
@@ -715,12 +715,6 @@ class AskBeginTransaction(Packet):
"""
Ask to begin a new transaction. C -> PM.
"""
- def _encode(self, tid):
- return _encodeTID(tid)
-
- def _decode(self, body):
- (tid, ) = unpack('8s', body)
- return (_decodeTID(tid), )
class AnswerBeginTransaction(Packet):
"""
Modified: trunk/neo/tests/client/testClientApp.py
==============================================================================
--- trunk/neo/tests/client/testClientApp.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testClientApp.py [iso-8859-1] Fri Aug 27 17:46:58 2010
@@ -606,7 +606,6 @@ class ClientApplicationTests(NeoTestBase
# will check if there was just one call/packet :
self.checkNotifyPacket(conn1, Packets.AbortTransaction)
self.checkNotifyPacket(conn2, Packets.AbortTransaction)
- self.checkNotifyPacket(app.master_conn, Packets.AbortTransaction)
self.assertEquals(app.local_var.tid, None)
self.assertEquals(app.local_var.txn, None)
self.assertEquals(app.local_var.data_dict, {})
@@ -672,7 +671,6 @@ class ClientApplicationTests(NeoTestBase
app.cp = ConnectionPool()
# abort must be sent to storage 1 and 2
app.tpc_abort(txn)
- self.checkAbortTransaction(app.master_conn)
self.checkAbortTransaction(conn2)
self.checkAbortTransaction(conn3)
@@ -1040,7 +1038,7 @@ class ClientApplicationTests(NeoTestBase
def _waitMessage_hook(app, conn, msg_id, handler=None):
self.test_ok = True
_waitMessage_old = Application._waitMessage
- packet = Packets.AskBeginTransaction(None)
+ packet = Packets.AskBeginTransaction()
packet.setId(0)
Application._waitMessage = _waitMessage_hook
try:
@@ -1066,7 +1064,7 @@ class ClientApplicationTests(NeoTestBase
self.test_ok = True
_waitMessage_old = Application._waitMessage
Application._waitMessage = _waitMessage_hook
- packet = Packets.AskBeginTransaction(None)
+ packet = Packets.AskBeginTransaction()
packet.setId(0)
try:
app._askPrimary(packet)
Modified: trunk/neo/tests/master/testClientHandler.py
==============================================================================
--- trunk/neo/tests/master/testClientHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testClientHandler.py [iso-8859-1] Fri Aug 27 17:46:58 2010
@@ -72,11 +72,8 @@ class MasterClientHandlerTests(NeoTestBa
# client call it
client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, port=self.client_port)
conn = self.getFakeConnection(client_uuid, self.client_address)
- service.askBeginTransaction(conn, None)
+ service.askBeginTransaction(conn)
self.assertTrue(ltid < self.app.tm.getLastTID())
- self.assertEqual(len(self.app.tm.getPendingList()), 1)
- tid = self.app.tm.getPendingList()[0]
- self.assertEquals(tid, self.app.tm.getLastTID())
def test_08_askNewOIDs(self):
service = self.service
@@ -97,18 +94,6 @@ class MasterClientHandlerTests(NeoTestBa
def test_09_askFinishTransaction(self):
service = self.service
uuid = self.identifyToMasterNode()
- # give an older tid than the PMN known, must abort
- client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, port=self.client_port)
- conn = self.getFakeConnection(client_uuid, self.client_address)
- oid_list = []
- upper, lower = unpack('!LL', self.app.tm.getLastTID())
- new_tid = pack('!LL', upper, lower + 10)
- self.checkProtocolErrorRaised(service.askFinishTransaction, conn,
- new_tid, oid_list)
- old_node = self.app.nm.getByUUID(uuid)
- self.app.nm.remove(old_node)
- self.app.pt.dropNode(old_node)
-
# do the right job
client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, port=self.client_port)
storage_uuid = self.identifyToMasterNode()
@@ -119,7 +104,7 @@ class MasterClientHandlerTests(NeoTestBa
'getPartition': 0,
'getCellList': [Mock({'getUUID': storage_uuid})],
})
- service.askBeginTransaction(conn, None)
+ service.askBeginTransaction(conn)
oid_list = []
tid = self.app.tm.getLastTID()
conn = self.getFakeConnection(client_uuid, self.client_address)
@@ -169,17 +154,12 @@ class MasterClientHandlerTests(NeoTestBa
port = self.client_port)
conn = self.getFakeConnection(client_uuid, self.client_address)
lptid = self.app.pt.getID()
- self.service.askBeginTransaction(conn, None)
- self.service.askBeginTransaction(conn, None)
- self.service.askBeginTransaction(conn, None)
self.assertEquals(self.app.nm.getByUUID(client_uuid).getState(),
NodeStates.RUNNING)
- self.assertEquals(len(self.app.tm.getPendingList()), 3)
method(conn)
# node must be have been remove, and no more transaction must remains
self.assertEquals(self.app.nm.getByUUID(client_uuid), None)
self.assertEquals(lptid, self.app.pt.getID())
- self.assertFalse(self.app.tm.hasPending())
def test_15_peerBroken(self):
self.__testWithMethod(self.service.peerBroken, NodeStates.BROKEN)
Modified: trunk/neo/tests/master/testStorageHandler.py
==============================================================================
--- trunk/neo/tests/master/testStorageHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testStorageHandler.py [iso-8859-1] Fri Aug 27 17:46:58 2010
@@ -104,8 +104,8 @@ class MasterStorageHandlerTests(NeoTestB
oid_list = self.getOID(), self.getOID()
msg_id = 1
# register a transaction
- tid = self.app.tm.begin(client_1, None)
- self.app.tm.prepare(tid, oid_list, uuid_list, msg_id)
+ tid = self.app.tm.begin()
+ self.app.tm.prepare(client_1, tid, oid_list, uuid_list, msg_id)
self.assertTrue(tid in self.app.tm)
# the first storage acknowledge the lock
self.service.answerInformationLocked(storage_conn_1, tid)
@@ -148,9 +148,13 @@ class MasterStorageHandlerTests(NeoTestB
# create some transaction
node, conn = self.identifyToMasterNode(node_type=NodeTypes.CLIENT,
port=self.client_port)
- self.client_handler.askBeginTransaction(conn, None)
- self.client_handler.askBeginTransaction(conn, None)
- self.client_handler.askBeginTransaction(conn, None)
+ def create_transaction(index):
+ tid = self.getNextTID()
+ oid_list = [self.getOID(index)]
+ self.app.tm.prepare(node, tid, oid_list, [node.getUUID()], index)
+ create_transaction(1)
+ create_transaction(2)
+ create_transaction(3)
conn = self.getFakeConnection(node.getUUID(), self.storage_address)
service.askUnfinishedTransactions(conn)
packet = self.checkAnswerUnfinishedTransactions(conn)
@@ -214,17 +218,14 @@ class MasterStorageHandlerTests(NeoTestB
# Transaction 1: 2 storage nodes involved, one will die and the other
# already answered node lock
msg_id_1 = 1
- tm.begin(client1, tid1)
- tm.prepare(tid1, oid_list, [node1.getUUID(), node2.getUUID()], msg_id_1)
+ tm.prepare(client1, tid1, oid_list, [node1.getUUID(), node2.getUUID()], msg_id_1)
tm.lock(tid1, node2.getUUID())
# Transaction 2: 2 storage nodes involved, one will die
msg_id_2 = 2
- tm.begin(client2, tid2)
- tm.prepare(tid2, oid_list, [node1.getUUID(), node2.getUUID()], msg_id_2)
+ tm.prepare(client2, tid2, oid_list, [node1.getUUID(), node2.getUUID()], msg_id_2)
# Transaction 3: 1 storage node involved, which won't die
msg_id_3 = 3
- tm.begin(client3, tid3)
- tm.prepare(tid3, oid_list, [node2.getUUID(), ], msg_id_3)
+ tm.prepare(client3, tid3, oid_list, [node2.getUUID(), ], msg_id_3)
# Assert initial state
self.checkNoPacketSent(cconn1)
Modified: trunk/neo/tests/master/testTransactions.py
==============================================================================
--- trunk/neo/tests/master/testTransactions.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testTransactions.py [iso-8859-1] Fri Aug 27 17:46:58 2010
@@ -37,13 +37,13 @@ class testTransactionManager(NeoTestBase
# test data
node = Mock({'__repr__': 'Node'})
tid = self.makeTID(1)
- oid_list = (oid1, oid2) = (self.makeOID(1), self.makeOID(2))
- uuid_list = (uuid1, uuid2) = (self.makeUUID(1), self.makeUUID(2))
+ oid_list = (oid1, oid2) = [self.makeOID(1), self.makeOID(2)]
+ uuid_list = (uuid1, uuid2) = [self.makeUUID(1), self.makeUUID(2)]
msg_id = 1
# create transaction object
- txn = Transaction(node, tid)
- self.assertEqual(txn.getUUIDList(), [])
- txn.prepare(oid_list, uuid_list, msg_id)
+ txn = Transaction(node, tid, oid_list, uuid_list, msg_id)
+ self.assertEqual(txn.getUUIDList(), uuid_list)
+ self.assertEqual(txn.getOIDList(), oid_list)
# lock nodes one by one
self.assertFalse(txn.lock(uuid1))
self.assertTrue(txn.lock(uuid2))
@@ -61,14 +61,15 @@ class testTransactionManager(NeoTestBase
self.assertFalse(txnman.hasPending())
self.assertEqual(txnman.getPendingList(), [])
# begin the transaction
- tid = txnman.begin(node, None)
+ tid = txnman.begin()
self.assertTrue(tid is not None)
+ self.assertFalse(txnman.hasPending())
+ self.assertEqual(len(txnman.getPendingList()), 0)
+ # prepare the transaction
+ txnman.prepare(node, tid, oid_list, uuid_list, msg_id)
self.assertTrue(txnman.hasPending())
- self.assertEqual(len(txnman.getPendingList()), 1)
self.assertEqual(txnman.getPendingList()[0], tid)
self.assertEqual(txnman[tid].getTID(), tid)
- # prepare the transaction
- txnman.prepare(tid, oid_list, uuid_list, msg_id)
txn = txnman[tid]
self.assertEqual(txn.getUUIDList(), list(uuid_list))
self.assertEqual(txn.getOIDList(), list(oid_list))
@@ -82,12 +83,19 @@ class testTransactionManager(NeoTestBase
def testAbortFor(self):
node1 = Mock({'__hash__': 1})
node2 = Mock({'__hash__': 2})
+ oid_list = [self.makeOID(1), ]
+ storage_1_uuid = self.makeUUID(1)
+ storage_2_uuid = self.makeUUID(2)
txnman = TransactionManager()
# register 4 transactions made by two nodes
- tid11 = txnman.begin(node1, None)
- tid12 = txnman.begin(node1, None)
- tid21 = txnman.begin(node2, None)
- tid22 = txnman.begin(node2, None)
+ tid11 = txnman.begin()
+ txnman.prepare(node1, tid11, oid_list, [storage_1_uuid], 1)
+ tid12 = txnman.begin()
+ txnman.prepare(node1, tid12, oid_list, [storage_1_uuid], 2)
+ tid21 = txnman.begin()
+ txnman.prepare(node2, tid21, oid_list, [storage_2_uuid], 3)
+ tid22 = txnman.begin()
+ txnman.prepare(node2, tid22, oid_list, [storage_2_uuid], 4)
self.assertTrue(tid11 < tid12 < tid21 < tid22)
self.assertEqual(len(txnman.getPendingList()), 4)
# abort transactions of one node
@@ -120,7 +128,7 @@ class testTransactionManager(NeoTestBase
self.assertEqual(txnman.getLastTID(), None)
# first transaction
node1 = Mock({'__hash__': 1})
- tid1 = txnman.begin(node1, None)
+ tid1 = txnman.begin()
self.assertTrue(tid1 is not None)
self.assertEqual(txnman.getLastTID(), tid1)
# set a new last TID
@@ -130,7 +138,7 @@ class testTransactionManager(NeoTestBase
self.assertTrue(ntid > tid1)
# new trancation
node2 = Mock({'__hash__': 2})
- tid2 = txnman.begin(node2, None)
+ tid2 = txnman.begin()
self.assertTrue(tid2 is not None)
self.assertTrue(tid2 > ntid > tid1)
@@ -140,26 +148,23 @@ class testTransactionManager(NeoTestBase
client3 = Mock({'__hash__': 3})
storage_1_uuid = self.makeUUID(1)
storage_2_uuid = self.makeUUID(2)
- tid1 = self.makeTID(1)
- tid2 = self.makeTID(2)
- tid3 = self.makeTID(3)
oid_list = [self.makeOID(1), ]
tm = TransactionManager()
# Transaction 1: 2 storage nodes involved, one will die and the other
# already answered node lock
msg_id_1 = 1
- tm.begin(client1, tid1)
- tm.prepare(tid1, oid_list, [storage_1_uuid, storage_2_uuid], msg_id_1)
+ tid1 = tm.begin()
+ tm.prepare(client1, tid1, oid_list, [storage_1_uuid, storage_2_uuid], msg_id_1)
tm.lock(tid1, storage_2_uuid)
# Transaction 2: 2 storage nodes involved, one will die
msg_id_2 = 2
- tm.begin(client2, tid2)
- tm.prepare(tid2, oid_list, [storage_1_uuid, storage_2_uuid], msg_id_2)
+ tid2 = tm.begin()
+ tm.prepare(client2, tid2, oid_list, [storage_1_uuid, storage_2_uuid], msg_id_2)
# Transaction 3: 1 storage node involved, which won't die
msg_id_3 = 3
- tm.begin(client3, tid3)
- tm.prepare(tid3, oid_list, [storage_2_uuid, ], msg_id_3)
+ tid3 = tm.begin()
+ tm.prepare(client3, tid3, oid_list, [storage_2_uuid, ], msg_id_3)
t1 = tm[tid1]
t2 = tm[tid2]
Modified: trunk/neo/tests/testConnection.py
==============================================================================
--- trunk/neo/tests/testConnection.py [iso-8859-1] (original)
+++ trunk/neo/tests/testConnection.py [iso-8859-1] Fri Aug 27 17:46:58 2010
@@ -852,7 +852,7 @@ class HandlerSwitcherTests(NeoTestBase):
return packet
def _makeRequest(self, msg_id):
- packet = Packets.AskBeginTransaction(self.getNextTID())
+ packet = Packets.AskBeginTransaction()
packet.setId(msg_id)
return packet
Modified: trunk/neo/tests/testProtocol.py
==============================================================================
--- trunk/neo/tests/testProtocol.py [iso-8859-1] (original)
+++ trunk/neo/tests/testProtocol.py [iso-8859-1] Fri Aug 27 17:46:58 2010
@@ -238,14 +238,8 @@ class ProtocolTests(NeoTestBase):
def test_32_askBeginTransaction(self):
- # try with an invalid TID, None must be returned
- tid = INVALID_TID
- p = Packets.AskBeginTransaction(tid)
- self.assertEqual(p.decode(), (None, ))
- # and with another TID
- tid = '\1' * 8
- p = Packets.AskBeginTransaction(tid)
- self.assertEqual(p.decode(), (tid, ))
+ p = Packets.AskBeginTransaction()
+ self.assertEqual(p.decode(), ())
def test_33_answerBeginTransaction(self):
tid = self.getNextTID()
More information about the Neo-report
mailing list