[Neo-report] r2534 vincent - in /trunk/neo: ./ client/ client/handlers/ master/ master/han...
nobody at svn.erp5.org
nobody at svn.erp5.org
Tue Dec 14 16:57:01 CET 2010
Author: vincent
Date: Tue Dec 14 16:57:01 2010
New Revision: 2534
Log:
Shorten 2PC lock to cover only tpc_finish.
This allows parallel execution of tpc_begin, stores & related conflict
resolution and tpc_vote for different transactions.
This requires an extension to ZODB allowing to keep TID secret until
tpc_finish (ie, so that it doesn't require tpc_vote to return tid for each
stored object).
Modified:
trunk/neo/client/app.py
trunk/neo/client/handlers/master.py
trunk/neo/handler.py
trunk/neo/master/handlers/client.py
trunk/neo/master/handlers/storage.py
trunk/neo/master/transactions.py
trunk/neo/protocol.py
trunk/neo/storage/handlers/master.py
trunk/neo/storage/transactions.py
trunk/neo/tests/client/testClientApp.py
trunk/neo/tests/client/testMasterHandler.py
trunk/neo/tests/master/testClientHandler.py
trunk/neo/tests/master/testStorageHandler.py
trunk/neo/tests/master/testTransactions.py
trunk/neo/tests/storage/testMasterHandler.py
trunk/neo/tests/storage/testTransactions.py
trunk/neo/tests/testProtocol.py
Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Tue Dec 14 16:57:01 2010
@@ -882,8 +882,6 @@ class Application(object):
raise NEOStorageError('tpc_store failed')
elif oid in resolved_oid_set:
append((oid, ResolvedSerial))
- else:
- append((oid, tid))
return result
@profiler_decorator
@@ -975,16 +973,17 @@ class Application(object):
self.tpc_vote(transaction, tryToResolveConflict)
self._load_lock_acquire()
try:
+ # Call finish on master
+ oid_list = local_var.data_list
+ p = Packets.AskFinishTransaction(local_var.tid, oid_list)
+ self._askPrimary(p)
+
+ # From now on, self.local_var.tid holds the "real" TID.
tid = local_var.tid
# Call function given by ZODB
if f is not None:
f(tid)
- # Call finish on master
- oid_list = local_var.data_list
- p = Packets.AskFinishTransaction(tid, oid_list)
- self._askPrimary(p)
-
# Update cache
self._cache_lock_acquire()
try:
@@ -1280,26 +1279,20 @@ class Application(object):
@profiler_decorator
def importFrom(self, source, start, stop, tryToResolveConflict):
serials = {}
- def updateLastSerial(oid, result):
- if result:
- if isinstance(result, str):
- assert oid is not None
- serials[oid] = result
- else:
- for oid, serial in result:
- assert isinstance(serial, str), serial
- serials[oid] = serial
transaction_iter = source.iterator(start, stop)
for transaction in transaction_iter:
- self.tpc_begin(transaction, transaction.tid, transaction.status)
+ tid = transaction.tid
+ self.tpc_begin(transaction, tid, transaction.status)
for r in transaction:
- pre = serials.get(r.oid, None)
+ oid = r.oid
+ pre = serials.get(oid, None)
# TODO: bypass conflict resolution, locks...
- result = self.store(r.oid, pre, r.data, r.version, transaction)
- updateLastSerial(r.oid, result)
- updateLastSerial(None, self.tpc_vote(transaction,
- tryToResolveConflict))
- self.tpc_finish(transaction, tryToResolveConflict)
+ self.store(oid, pre, r.data, r.version, transaction)
+ serials[oid] = tid
+ conflicted = self.tpc_vote(transaction, tryToResolveConflict)
+ assert not conflicted, conflicted
+ real_tid = self.tpc_finish(transaction, tryToResolveConflict)
+ assert real_tid == tid, (real_tid, tid)
transaction_iter.close()
def iterator(self, start=None, stop=None):
Modified: trunk/neo/client/handlers/master.py
==============================================================================
--- trunk/neo/client/handlers/master.py [iso-8859-1] (original)
+++ trunk/neo/client/handlers/master.py [iso-8859-1] Tue Dec 14 16:57:01 2010
@@ -163,9 +163,10 @@ class PrimaryAnswersHandler(AnswerBaseHa
def answerNewOIDs(self, conn, oid_list):
self.app.new_oid_list = oid_list
- def answerTransactionFinished(self, conn, tid):
- if tid != self.app.getTID():
+ def answerTransactionFinished(self, conn, ttid, tid):
+ if ttid != self.app.getTID():
raise NEOStorageError('Wrong TID, transaction not started')
+ self.app.setTID(tid)
def answerPack(self, conn, status):
if not status:
Modified: trunk/neo/handler.py
==============================================================================
--- trunk/neo/handler.py [iso-8859-1] (original)
+++ trunk/neo/handler.py [iso-8859-1] Tue Dec 14 16:57:01 2010
@@ -211,10 +211,10 @@ class EventHandler(object):
def askFinishTransaction(self, conn, tid, oid_list):
raise UnexpectedPacketError
- def answerTransactionFinished(self, conn, tid):
+ def answerTransactionFinished(self, conn, ttid, tid):
raise UnexpectedPacketError
- def askLockInformation(self, conn, tid, oid_list):
+ def askLockInformation(self, conn, ttid, tid, oid_list):
raise UnexpectedPacketError
def answerInformationLocked(self, conn, tid):
Modified: trunk/neo/master/handlers/client.py
==============================================================================
--- trunk/neo/master/handlers/client.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/client.py [iso-8859-1] Tue Dec 14 16:57:01 2010
@@ -61,13 +61,13 @@ class ClientServiceHandler(MasterHandler
conn.answer(Packets.AnswerNewOIDs(app.tm.getNextOIDList(num_oids)))
app.broadcastLastOID()
- def askFinishTransaction(self, conn, tid, oid_list):
+ def askFinishTransaction(self, conn, ttid, oid_list):
app = self.app
# Collect partitions related to this transaction.
getPartition = app.pt.getPartition
partition_set = set()
- partition_set.add(getPartition(tid))
+ partition_set.add(getPartition(ttid))
partition_set.update((getPartition(oid) for oid in oid_list))
# Collect the UUIDs of nodes related to this transaction.
@@ -79,6 +79,22 @@ class ClientServiceHandler(MasterHandler
if cell.getNodeState() != NodeStates.HIDDEN)
if isStorageReady(uuid)))
+ if not uuid_set:
+ raise ProtocolError('No storage node ready for transaction')
+
+ identified_node_list = app.nm.getIdentifiedList(pool_set=uuid_set)
+ usable_uuid_set = set((x.getUUID() for x in identified_node_list))
+ partitions = app.pt.getPartitions()
+ peer_id = conn.getPeerId()
+ node = app.nm.getByUUID(conn.getUUID())
+ try:
+ tid = app.tm.prepare(node, ttid, partitions, oid_list,
+ usable_uuid_set, peer_id)
+ except DelayedError:
+ app.queueEvent(self.askFinishTransaction, conn, ttid,
+ oid_list)
+ return
+
# check if greater and foreign OID was stored
if app.tm.updateLastOID(oid_list):
app.broadcastLastOID()
@@ -86,14 +102,9 @@ class ClientServiceHandler(MasterHandler
# Request locking data.
# build a new set as we may not send the message to all nodes as some
# might be not reachable at that time
- p = Packets.AskLockInformation(tid, oid_list)
- used_uuid_set = set()
- for node in app.nm.getIdentifiedList(pool_set=uuid_set):
+ p = Packets.AskLockInformation(ttid, tid, oid_list)
+ for node in identified_node_list:
node.ask(p, timeout=60)
- used_uuid_set.add(node.getUUID())
-
- node = app.nm.getByUUID(conn.getUUID())
- app.tm.prepare(node, tid, oid_list, used_uuid_set, conn.getPeerId())
def askPack(self, conn, tid):
app = self.app
Modified: trunk/neo/master/handlers/storage.py
==============================================================================
--- trunk/neo/master/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/storage.py [iso-8859-1] Tue Dec 14 16:57:01 2010
@@ -88,10 +88,12 @@ class StorageServiceHandler(BaseServiceH
app = self.app
tm = app.tm
t = tm[tid]
+ ttid = t.getTTID()
nm = app.nm
transaction_node = t.getNode()
invalidate_objects = Packets.InvalidateObjects(tid, t.getOIDList())
- answer_transaction_finished = Packets.AnswerTransactionFinished(tid)
+ answer_transaction_finished = Packets.AnswerTransactionFinished(ttid,
+ tid)
for client_node in nm.getClientList(only_identified=True):
c = client_node.getConnection()
if client_node is transaction_node:
@@ -100,7 +102,7 @@ class StorageServiceHandler(BaseServiceH
c.notify(invalidate_objects)
# - Unlock Information to relevant storage nodes.
- notify_unlock = Packets.NotifyUnlockInformation(tid)
+ notify_unlock = Packets.NotifyUnlockInformation(ttid)
for storage_uuid in t.getUUIDList():
nm.getByUUID(storage_uuid).getConnection().notify(notify_unlock)
Modified: trunk/neo/master/transactions.py
==============================================================================
--- trunk/neo/master/transactions.py [iso-8859-1] (original)
+++ trunk/neo/master/transactions.py [iso-8859-1] Tue Dec 14 16:57:01 2010
@@ -19,7 +19,7 @@ from time import time, gmtime
from struct import pack, unpack
from neo.protocol import ZERO_TID
from datetime import timedelta, datetime
-from neo.util import dump
+from neo.util import dump, u64, p64
import neo
TID_LOW_OVERFLOW = 2**32
@@ -92,11 +92,12 @@ class Transaction(object):
A pending transaction
"""
- def __init__(self, node, tid, oid_list, uuid_list, msg_id):
+ def __init__(self, node, ttid, tid, oid_list, uuid_list, msg_id):
"""
Prepare the transaction, set OIDs and UUIDs related to it
"""
self._node = node
+ self._ttid = ttid
self._tid = tid
self._oid_list = oid_list
self._msg_id = msg_id
@@ -122,6 +123,12 @@ class Transaction(object):
"""
return self._node
+ def getTTID(self):
+ """
+ Return the temporary transaction ID.
+ """
+ return self._ttid
+
def getTID(self):
"""
Return the transaction ID
@@ -184,6 +191,8 @@ class TransactionManager(object):
# We don't need to use a real lock, as we are mono-threaded.
_locked = None
+ _next_ttid = 0
+
def __init__(self):
# tid -> transaction
self._tid_dict = {}
@@ -232,8 +241,18 @@ class TransactionManager(object):
def getLastOID(self):
return self._last_oid
- def _nextTID(self):
- """ Compute the next TID based on the current time and check collisions """
+ def _nextTID(self, ttid, divisor):
+ """
+ Compute the next TID based on the current time and check collisions.
+ Also, adjust it so that
+ tid % divisor == ttid % divisor
+ while preserving
+ min_tid < tid
+ When constraints allow, prefer decreasing generated TID, to avoid
+ fast-forwarding to future dates.
+ """
+ assert isinstance(ttid, basestring), repr(ttid)
+ assert isinstance(divisor, (int, long)), repr(divisor)
tm = time()
gmt = gmtime(tm)
tid = packTID((
@@ -241,8 +260,28 @@ class TransactionManager(object):
gmt.tm_min),
int((gmt.tm_sec % 60 + (tm - int(tm))) / SECOND_PER_TID_LOW)
))
- if tid <= self._last_tid:
- tid = addTID(self._last_tid, 1)
+ min_tid = self._last_tid
+ if tid <= min_tid:
+ tid = addTID(min_tid, 1)
+ # We know we won't have room to adjust by decreasing.
+ try_decrease = False
+ else:
+ try_decrease = True
+ ref_remainder = u64(ttid) % divisor
+ remainder = u64(tid) % divisor
+ if ref_remainder != remainder:
+ if try_decrease:
+ new_tid = addTID(tid, ref_remainder - divisor - remainder)
+ assert u64(new_tid) % divisor == ref_remainder, (dump(new_tid),
+ ref_remainder)
+ if new_tid <= min_tid:
+ new_tid = addTID(new_tid, divisor)
+ else:
+ if ref_remainder > remainder:
+ ref_remainder += divisor
+ new_tid = addTID(tid, ref_remainder - remainder)
+ assert min_tid < new_tid, (dump(min_tid), dump(tid), dump(new_tid))
+ tid = new_tid
self._last_tid = tid
return self._last_tid
@@ -258,6 +297,14 @@ class TransactionManager(object):
"""
self._last_tid = max(self._last_tid, tid)
+ def getTTID(self):
+ """
+ Generate a temporary TID, to be used only during a single node's
+ 2PC.
+ """
+ self._next_ttid += 1
+ return p64(self._next_ttid)
+
def reset(self):
"""
Discard all manager content
@@ -282,28 +329,47 @@ class TransactionManager(object):
"""
Generate a new TID
"""
- if self._locked is not None:
- raise DelayedError()
if tid is None:
- tid = self._nextTID()
- self._locked = tid
+ # No TID requested, generate a temporary one
+ tid = self.getTTID()
+ else:
+ # TID requested, take commit lock immediately
+ if self._locked is not None:
+ raise DelayedError()
+ self._locked = tid
return tid
- def prepare(self, node, tid, oid_list, uuid_list, msg_id):
+ def prepare(self, node, ttid, divisor, oid_list, uuid_list, msg_id):
"""
Prepare a transaction to be finished
"""
+ locked = self._locked
+ if locked == ttid:
+ # Transaction requested some TID upon begin, and it owns the commit
+ # lock since then.
+ tid = ttid
+ else:
+ # Otherwise, acquire lock and allocate a new TID.
+ if locked is not None:
+ raise DelayedError()
+ tid = self._nextTID(ttid, divisor)
+ self._locked = tid
+
self.setLastTID(tid)
- txn = Transaction(node, tid, oid_list, uuid_list, msg_id)
+ txn = Transaction(node, ttid, tid, oid_list, uuid_list, msg_id)
self._tid_dict[tid] = txn
self._node_dict.setdefault(node, {})[tid] = txn
+ return tid
def remove(self, tid):
"""
Remove a transaction, commited or aborted
"""
- assert self._locked == tid, (self._locked, tid)
- self._locked = None
+ if tid == self._locked:
+ # If TID has the lock, release it.
+ # It might legitimately not have the lock (ex: a transaction
+ # aborting, which didn't request a TID upon begin)
+ self._locked = None
tid_dict = self._tid_dict
if tid in tid_dict:
# ...and tried to finish
Modified: trunk/neo/protocol.py
==============================================================================
--- trunk/neo/protocol.py [iso-8859-1] (original)
+++ trunk/neo/protocol.py [iso-8859-1] Tue Dec 14 16:57:01 2010
@@ -789,30 +789,30 @@ class AnswerTransactionFinished(Packet):
"""
Answer when a transaction is finished. PM -> C.
"""
- def _encode(self, tid):
- return _encodeTID(tid)
+ def _encode(self, ttid, tid):
+ return _encodeTID(ttid) + _encodeTID(tid)
def _decode(self, body):
- (tid, ) = unpack('8s', body)
- return (_decodeTID(tid), )
+ (ttid, tid) = unpack('8s8s', body)
+ return (_decodeTID(ttid), _decodeTID(tid))
class AskLockInformation(Packet):
"""
Lock information on a transaction. PM -> S.
"""
# XXX: Identical to InvalidateObjects and AskFinishTransaction
- _header_format = '!8sL'
+ _header_format = '!8s8sL'
_list_entry_format = '8s'
_list_entry_len = calcsize(_list_entry_format)
- def _encode(self, tid, oid_list):
- body = [pack(self._header_format, tid, len(oid_list))]
+ def _encode(self, ttid, tid, oid_list):
+ body = [pack(self._header_format, ttid, tid, len(oid_list))]
body.extend(oid_list)
return ''.join(body)
def _decode(self, body):
offset = self._header_len
- (tid, n) = unpack(self._header_format, body[:offset])
+ (ttid, tid, n) = unpack(self._header_format, body[:offset])
oid_list = []
list_entry_format = self._list_entry_format
list_entry_len = self._list_entry_len
@@ -821,7 +821,7 @@ class AskLockInformation(Packet):
oid = unpack(list_entry_format, body[offset:next_offset])[0]
offset = next_offset
oid_list.append(oid)
- return (tid, oid_list)
+ return (ttid, tid, oid_list)
class AnswerInformationLocked(Packet):
"""
Modified: trunk/neo/storage/handlers/master.py
==============================================================================
--- trunk/neo/storage/handlers/master.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/master.py [iso-8859-1] Tue Dec 14 16:57:01 2010
@@ -53,10 +53,10 @@ class MasterOperationHandler(BaseMasterH
elif state == CellStates.OUT_OF_DATE:
app.replicator.addPartition(offset)
- def askLockInformation(self, conn, tid, oid_list):
- if not tid in self.app.tm:
+ def askLockInformation(self, conn, ttid, tid, oid_list):
+ if not ttid in self.app.tm:
raise ProtocolError('Unknown transaction')
- self.app.tm.lock(tid, oid_list)
+ self.app.tm.lock(ttid, tid, oid_list)
if not conn.isClosed():
conn.answer(Packets.AnswerInformationLocked(tid))
Modified: trunk/neo/storage/transactions.py
==============================================================================
--- trunk/neo/storage/transactions.py [iso-8859-1] (original)
+++ trunk/neo/storage/transactions.py [iso-8859-1] Tue Dec 14 16:57:01 2010
@@ -44,10 +44,11 @@ class Transaction(object):
"""
Container for a pending transaction
"""
+ _tid = None
- def __init__(self, uuid, tid):
+ def __init__(self, uuid, ttid):
self._uuid = uuid
- self._tid = tid
+ self._ttid = ttid
self._object_dict = {}
self._transaction = None
self._locked = False
@@ -55,8 +56,9 @@ class Transaction(object):
self._checked_set = set()
def __repr__(self):
- return "<%s(tid=%r, uuid=%r, locked=%r, age=%.2fs)> at %x" % (
+ return "<%s(ttid=%r, tid=%r, uuid=%r, locked=%r, age=%.2fs)> at %x" % (
self.__class__.__name__,
+ dump(self._ttid),
dump(self._tid),
dump(self._uuid),
self.isLocked(),
@@ -67,6 +69,14 @@ class Transaction(object):
def addCheckedObject(self, oid):
self._checked_set.add(oid)
+ def getTTID(self):
+ return self._ttid
+
+ def setTID(self, tid):
+ assert self._tid is None, dump(self._tid)
+ assert tid is not None
+ self._tid = tid
+
def getTID(self):
return self._tid
@@ -158,20 +168,21 @@ class TransactionManager(object):
self._load_lock_dict.clear()
self._uuid_dict.clear()
- def lock(self, tid, oid_list):
+ def lock(self, ttid, tid, oid_list):
"""
Lock a transaction
"""
- transaction = self._transaction_dict[tid]
+ transaction = self._transaction_dict[ttid]
# remember that the transaction has been locked
transaction.lock()
for oid in transaction.getOIDList():
- self._load_lock_dict[oid] = tid
+ self._load_lock_dict[oid] = ttid
# check every object that should be locked
uuid = transaction.getUUID()
is_assigned = self._app.pt.isAssigned
for oid in oid_list:
- if is_assigned(oid, uuid) and self._load_lock_dict.get(oid) != tid:
+ if is_assigned(oid, uuid) and \
+ self._load_lock_dict.get(oid) != ttid:
raise ValueError, 'Some locks are not held'
object_list = transaction.getObjectList()
# txn_info is None is the transaction information is not stored on
@@ -179,13 +190,18 @@ class TransactionManager(object):
txn_info = transaction.getTransactionInformations()
# store data from memory to temporary table
self._app.dm.storeTransaction(tid, object_list, txn_info)
+ # ...and remember its definitive TID
+ transaction.setTID(tid)
+
+ def getTIDFromTTID(self, ttid):
+ return self._transaction_dict[ttid].getTID()
- def unlock(self, tid):
+ def unlock(self, ttid):
"""
Unlock transaction
"""
- self._app.dm.finishTransaction(tid)
- self.abort(tid, even_if_locked=True)
+ self._app.dm.finishTransaction(self.getTIDFromTTID(ttid))
+ self.abort(ttid, even_if_locked=True)
def storeTransaction(self, tid, oid_list, user, desc, ext, packed):
"""
@@ -283,8 +299,8 @@ class TransactionManager(object):
Abort any non-locked transaction of a node
"""
# abort any non-locked transaction of this node
- for tid in [x.getTID() for x in self._uuid_dict.get(uuid, [])]:
- self.abort(tid)
+ for ttid in [x.getTTID() for x in self._uuid_dict.get(uuid, [])]:
+ self.abort(ttid)
# cleanup _uuid_dict if no transaction remains for this node
transaction_set = self._uuid_dict.get(uuid)
if transaction_set is not None and not transaction_set:
Modified: trunk/neo/tests/client/testClientApp.py
==============================================================================
--- trunk/neo/tests/client/testClientApp.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testClientApp.py [iso-8859-1] Tue Dec 14 16:57:01 2010
@@ -705,7 +705,7 @@ class ClientApplicationTests(NeoUnitTest
def hook(tid):
self.f_called = True
self.f_called_with_tid = tid
- packet = Packets.AnswerTransactionFinished(INVALID_TID)
+ packet = Packets.AnswerTransactionFinished(INVALID_TID, INVALID_TID)
packet.setId(0)
app.master_conn = Mock({
'getNextId': 1,
@@ -722,8 +722,7 @@ class ClientApplicationTests(NeoUnitTest
app.local_var.txn_voted = True
self.assertRaises(NEOStorageError, app.tpc_finish, txn,
dummy_tryToResolveConflict, hook)
- self.assertTrue(self.f_called)
- self.assertEquals(self.f_called_with_tid, tid)
+ self.assertFalse(self.f_called)
self.assertEqual(self.vote_params, None)
self.checkAskFinishTransaction(app.master_conn)
self.checkDispatcherRegisterCalled(app, app.master_conn)
@@ -732,7 +731,7 @@ class ClientApplicationTests(NeoUnitTest
self.f_called = False
self.assertRaises(NEOStorageError, app.tpc_finish, txn,
dummy_tryToResolveConflict, hook)
- self.assertTrue(self.f_called)
+ self.assertFalse(self.f_called)
self.assertTrue(self.vote_params[0] is txn)
self.assertTrue(self.vote_params[1] is dummy_tryToResolveConflict)
app.tpc_vote = tpc_vote
@@ -741,14 +740,15 @@ class ClientApplicationTests(NeoUnitTest
# transaction is finished
app = self.getApp()
tid = self.makeTID()
+ ttid = self.makeTID()
txn = self.makeTransactionObject()
- app.local_var.txn, app.local_var.tid = txn, tid
+ app.local_var.txn, app.local_var.tid = txn, ttid
self.f_called = False
self.f_called_with_tid = None
def hook(tid):
self.f_called = True
self.f_called_with_tid = tid
- packet = Packets.AnswerTransactionFinished(tid)
+ packet = Packets.AnswerTransactionFinished(ttid, tid)
packet.setId(0)
app.master_conn = Mock({
'getNextId': 1,
Modified: trunk/neo/tests/client/testMasterHandler.py
==============================================================================
--- trunk/neo/tests/client/testMasterHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testMasterHandler.py [iso-8859-1] Tue Dec 14 16:57:01 2010
@@ -248,17 +248,18 @@ class MasterAnswersHandlerTests(MasterHa
def test_answerTransactionFinished(self):
conn = self.getConnection()
- tid1 = self.getNextTID()
- tid2 = self.getNextTID(tid1)
+ ttid1 = self.getNextTID()
+ ttid2 = self.getNextTID(ttid1)
+ tid2 = self.getNextTID(ttid2)
# wrong TID
- self.app = Mock({'getTID': tid1})
+ self.app = Mock({'getTID': ttid1})
self.assertRaises(NEOStorageError,
self.handler.answerTransactionFinished,
- conn, tid2)
+ conn, ttid2, tid2)
# matching TID
- app = Mock({'getTID': tid2})
+ app = Mock({'getTID': ttid2})
handler = PrimaryAnswersHandler(app=app)
- handler.answerTransactionFinished(conn, tid2)
+ handler.answerTransactionFinished(conn, ttid2, tid2)
def test_answerPack(self):
self.assertRaises(NEOStorageError, self.handler.answerPack, None, False)
Modified: trunk/neo/tests/master/testClientHandler.py
==============================================================================
--- trunk/neo/tests/master/testClientHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testClientHandler.py [iso-8859-1] Tue Dec 14 16:57:01 2010
@@ -65,13 +65,33 @@ class MasterClientHandlerTests(NeoUnitTe
# Tests
def test_07_askBeginTransaction(self):
+ tid1 = self.getNextTID()
+ tid2 = self.getNextTID()
service = self.service
- ltid = self.app.tm.getLastTID()
+ tm_org = self.app.tm
+ self.app.tm = tm = Mock({
+ 'begin': '\x00\x00\x00\x00\x00\x00\x00\x01',
+ })
# client call it
client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, port=self.client_port)
conn = self.getFakeConnection(client_uuid, self.client_address)
service.askBeginTransaction(conn, None)
- self.assertTrue(ltid < self.app.tm.getLastTID())
+ calls = tm.mockGetNamedCalls('begin')
+ self.assertEqual(len(calls), 1)
+ calls[0].checkArgs(None)
+ # Client asks for a TID
+ self.app.tm = tm_org
+ service.askBeginTransaction(conn, tid1)
+ # If asking again for a TID, call is queued
+ call_marker = []
+ def queueEvent(*args, **kw):
+ call_marker.append((args, kw))
+ self.app.queueEvent = queueEvent
+ service.askBeginTransaction(conn, tid2)
+ self.assertEqual(len(call_marker), 1)
+ args, kw = call_marker[0]
+ self.assertEqual(kw, {})
+ self.assertEqual(args, (service.askBeginTransaction, conn, tid2))
def test_08_askNewOIDs(self):
service = self.service
@@ -96,11 +116,19 @@ class MasterClientHandlerTests(NeoUnitTe
client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, port=self.client_port)
storage_uuid = self.identifyToMasterNode()
storage_conn = self.getFakeConnection(storage_uuid, self.storage_address)
+ storage2_uuid = self.identifyToMasterNode()
+ storage2_conn = self.getFakeConnection(storage2_uuid,
+ (self.storage_address[0], self.storage_address[1] + 1))
+ self.app.setStorageReady(storage2_uuid)
self.assertNotEquals(uuid, client_uuid)
conn = self.getFakeConnection(client_uuid, self.client_address)
self.app.pt = Mock({
'getPartition': 0,
- 'getCellList': [Mock({'getUUID': storage_uuid})],
+ 'getCellList': [
+ Mock({'getUUID': storage_uuid}),
+ Mock({'getUUID': storage2_uuid}),
+ ],
+ 'getPartitions': 2,
})
service.askBeginTransaction(conn, None)
oid_list = []
@@ -111,6 +139,7 @@ class MasterClientHandlerTests(NeoUnitTe
self.assertFalse(self.app.isStorageReady(storage_uuid))
service.askFinishTransaction(conn, tid, oid_list)
self.checkNoPacketSent(storage_conn)
+ self.app.tm.abortFor(self.app.nm.getByUUID(client_uuid))
# ...but AskLockInformation is sent if it is ready
self.app.setStorageReady(storage_uuid)
self.assertTrue(self.app.isStorageReady(storage_uuid))
@@ -118,8 +147,7 @@ class MasterClientHandlerTests(NeoUnitTe
self.checkAskLockInformation(storage_conn)
self.assertEquals(len(self.app.tm.getPendingList()), 1)
apptid = self.app.tm.getPendingList()[0]
- self.assertEquals(tid, apptid)
- txn = self.app.tm[tid]
+ txn = self.app.tm[apptid]
self.assertEquals(len(txn.getOIDList()), 0)
self.assertEquals(len(txn.getUUIDList()), 1)
Modified: trunk/neo/tests/master/testStorageHandler.py
==============================================================================
--- trunk/neo/tests/master/testStorageHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testStorageHandler.py [iso-8859-1] Tue Dec 14 16:57:01 2010
@@ -101,8 +101,9 @@ class MasterStorageHandlerTests(NeoUnitT
oid_list = self.getOID(), self.getOID()
msg_id = 1
# register a transaction
- tid = self.app.tm.begin()
- self.app.tm.prepare(client_1, tid, oid_list, uuid_list, msg_id)
+ ttid = self.app.tm.begin()
+ tid = self.app.tm.prepare(client_1, ttid, 1, oid_list, uuid_list,
+ msg_id)
self.assertTrue(tid in self.app.tm)
# the first storage acknowledge the lock
self.service.answerInformationLocked(storage_conn_1, tid)
@@ -145,18 +146,13 @@ class MasterStorageHandlerTests(NeoUnitT
# create some transaction
node, conn = self.identifyToMasterNode(node_type=NodeTypes.CLIENT,
port=self.client_port)
- def create_transaction(index):
- tid = self.getNextTID()
- oid_list = [self.getOID(index)]
- self.app.tm.prepare(node, tid, oid_list, [node.getUUID()], index)
- create_transaction(1)
- create_transaction(2)
- create_transaction(3)
+ self.app.tm.prepare(node, self.getNextTID(), 1,
+ [self.getOID(1)], [node.getUUID()], 1)
conn = self.getFakeConnection(node.getUUID(), self.storage_address)
service.askUnfinishedTransactions(conn)
packet = self.checkAnswerUnfinishedTransactions(conn)
(tid_list, ) = packet.decode()
- self.assertEqual(len(tid_list), 3)
+ self.assertEqual(len(tid_list), 1)
def _testWithMethod(self, method, state):
# define two nodes
@@ -212,10 +208,11 @@ class MasterStorageHandlerTests(NeoUnitT
# Transaction 1: 2 storage nodes involved, one will die and the other
# already answered node lock
msg_id_1 = 1
- tid1 = tm.begin()
- tm.prepare(client1, tid1, oid_list,
+ ttid1 = tm.begin()
+ tid1 = tm.prepare(client1, ttid1, 1, oid_list,
[node1.getUUID(), node2.getUUID()], msg_id_1)
tm.lock(tid1, node2.getUUID())
+ self.checkNoPacketSent(cconn1)
# Storage 1 dies
node1.setTemporarilyDown()
self.service.nodeLost(conn1, node1)
@@ -229,8 +226,8 @@ class MasterStorageHandlerTests(NeoUnitT
# Transaction 2: 2 storage nodes involved, one will die
msg_id_2 = 2
- tid2 = tm.begin()
- tm.prepare(client2, tid2, oid_list,
+ ttid2 = tm.begin()
+ tid2 = tm.prepare(client2, ttid2, 1, oid_list,
[node1.getUUID(), node2.getUUID()], msg_id_2)
# T2: pending locking answer, client keeps waiting
self.checkNoPacketSent(cconn2, check_notify=False)
@@ -238,8 +235,8 @@ class MasterStorageHandlerTests(NeoUnitT
# Transaction 3: 1 storage node involved, which won't die
msg_id_3 = 3
- tid3 = tm.begin()
- tm.prepare(client3, tid3, oid_list,
+ ttid3 = tm.begin()
+ tid3 = tm.prepare(client3, ttid3, 1, oid_list,
[node2.getUUID(), ], msg_id_3)
# T3: action not significant to this transacion, so no response
self.checkNoPacketSent(cconn3, check_notify=False)
Modified: trunk/neo/tests/master/testTransactions.py
==============================================================================
--- trunk/neo/tests/master/testTransactions.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testTransactions.py [iso-8859-1] Tue Dec 14 16:57:01 2010
@@ -39,11 +39,12 @@ class testTransactionManager(NeoUnitTest
# test data
node = Mock({'__repr__': 'Node'})
tid = self.makeTID(1)
+ ttid = self.makeTID(2)
oid_list = (oid1, oid2) = [self.makeOID(1), self.makeOID(2)]
uuid_list = (uuid1, uuid2) = [self.makeUUID(1), self.makeUUID(2)]
msg_id = 1
# create transaction object
- txn = Transaction(node, tid, oid_list, uuid_list, msg_id)
+ txn = Transaction(node, ttid, tid, oid_list, uuid_list, msg_id)
self.assertEqual(txn.getUUIDList(), uuid_list)
self.assertEqual(txn.getOIDList(), oid_list)
# lock nodes one by one
@@ -63,12 +64,12 @@ class testTransactionManager(NeoUnitTest
self.assertFalse(txnman.hasPending())
self.assertEqual(txnman.getPendingList(), [])
# begin the transaction
- tid = txnman.begin()
- self.assertTrue(tid is not None)
+ ttid = txnman.begin()
+ self.assertTrue(ttid is not None)
self.assertFalse(txnman.hasPending())
self.assertEqual(len(txnman.getPendingList()), 0)
# prepare the transaction
- txnman.prepare(node, tid, oid_list, uuid_list, msg_id)
+ tid = txnman.prepare(node, ttid, 1, oid_list, uuid_list, msg_id)
self.assertTrue(txnman.hasPending())
self.assertEqual(txnman.getPendingList()[0], tid)
self.assertEqual(txnman[tid].getTID(), tid)
@@ -91,8 +92,8 @@ class testTransactionManager(NeoUnitTest
txnman = TransactionManager()
# register 4 transactions made by two nodes
self.assertEqual(txnman.getPendingList(), [])
- tid1 = txnman.begin()
- txnman.prepare(node1, tid1, oid_list, [storage_1_uuid], 1)
+ ttid1 = txnman.begin()
+ tid1 = txnman.prepare(node1, ttid1, 1, oid_list, [storage_1_uuid], 1)
self.assertEqual(txnman.getPendingList(), [tid1])
# abort transactions of another node, transaction stays
txnman.abortFor(node2)
@@ -101,8 +102,8 @@ class testTransactionManager(NeoUnitTest
txnman.abortFor(node1)
self.assertEqual(txnman.getPendingList(), [])
self.assertFalse(txnman.hasPending())
- # ...and we can start another transaction
- tid2 = txnman.begin()
+ # ...and the lock is available
+ txnman.begin(self.getNextTID())
def test_getNextOIDList(self):
txnman = TransactionManager()
@@ -117,29 +118,6 @@ class testTransactionManager(NeoUnitTest
for i, oid in zip(xrange(len(oid_list)), oid_list):
self.assertEqual(oid, self.getOID(i+2))
- def test_getNextTID(self):
- txnman = TransactionManager()
- # no previous TID
- self.assertEqual(txnman.getLastTID(), ZERO_TID)
- # first transaction
- node1 = Mock({'__hash__': 1})
- tid1 = txnman.begin()
- self.assertTrue(tid1 is not None)
- self.assertEqual(txnman.getLastTID(), tid1)
- # set a new last TID
- ntid = pack('!Q', unpack('!Q', tid1)[0] + 10)
- txnman.setLastTID(ntid)
- self.assertEqual(txnman.getLastTID(), ntid)
- self.assertTrue(ntid > tid1)
- # If a new TID is generated, DelayedError is raised
- self.assertRaises(DelayedError, txnman.begin)
- txnman.remove(tid1)
- # new trancation
- node2 = Mock({'__hash__': 2})
- tid2 = txnman.begin()
- self.assertTrue(tid2 is not None)
- self.assertTrue(tid2 > ntid > tid1)
-
def test_forget(self):
client1 = Mock({'__hash__': 1})
client2 = Mock({'__hash__': 2})
@@ -152,8 +130,9 @@ class testTransactionManager(NeoUnitTest
# Transaction 1: 2 storage nodes involved, one will die and the other
# already answered node lock
msg_id_1 = 1
- tid1 = tm.begin()
- tm.prepare(client1, tid1, oid_list, [storage_1_uuid, storage_2_uuid], msg_id_1)
+ ttid1 = tm.begin()
+ tid1 = tm.prepare(client1, ttid1, 1, oid_list,
+ [storage_1_uuid, storage_2_uuid], msg_id_1)
tm.lock(tid1, storage_2_uuid)
t1 = tm[tid1]
self.assertFalse(t1.locked())
@@ -165,8 +144,9 @@ class testTransactionManager(NeoUnitTest
# Transaction 2: 2 storage nodes involved, one will die
msg_id_2 = 2
- tid2 = tm.begin()
- tm.prepare(client2, tid2, oid_list, [storage_1_uuid, storage_2_uuid], msg_id_2)
+ ttid2 = tm.begin()
+ tid2 = tm.prepare(client2, ttid2, 1, oid_list,
+ [storage_1_uuid, storage_2_uuid], msg_id_2)
t2 = tm[tid2]
self.assertFalse(t2.locked())
# Storage 1 dies:
@@ -178,8 +158,9 @@ class testTransactionManager(NeoUnitTest
# Transaction 3: 1 storage node involved, which won't die
msg_id_3 = 3
- tid3 = tm.begin()
- tm.prepare(client3, tid3, oid_list, [storage_2_uuid, ], msg_id_3)
+ ttid3 = tm.begin()
+ tid3 = tm.prepare(client3, ttid3, 1, oid_list, [storage_2_uuid, ],
+ msg_id_3)
t3 = tm[tid3]
self.assertFalse(t3.locked())
# Storage 1 dies:
@@ -222,12 +203,21 @@ class testTransactionManager(NeoUnitTest
Note: this implementation might change later, to allow more paralelism.
"""
tm = TransactionManager()
- tid1 = tm.begin()
- # Further calls fail with DelayedError
- self.assertRaises(DelayedError, tm.begin)
- # ...until tid1 gets removed
+ # With a requested TID, lock spans from begin to remove
+ ttid1 = self.getNextTID()
+ ttid2 = self.getNextTID()
+ tid1 = tm.begin(ttid1)
+ self.assertEqual(tid1, ttid1)
+ self.assertRaises(DelayedError, tm.begin, ttid2)
tm.remove(tid1)
- tid2 = tm.begin()
+ tm.remove(tm.begin(ttid2))
+ # Without a requested TID, lock spans from prepare to remove only
+ ttid3 = tm.begin()
+ ttid4 = tm.begin() # Doesn't raise
+ tid4 = tm.prepare(None, ttid4, 1, [], [], 0)
+ self.assertRaises(DelayedError, tm.prepare, None, ttid3, 1, [], [], 0)
+ tm.remove(tid4)
+ tm.prepare(None, ttid3, 1, [], [], 0)
if __name__ == '__main__':
unittest.main()
Modified: trunk/neo/tests/storage/testMasterHandler.py
==============================================================================
--- trunk/neo/tests/storage/testMasterHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testMasterHandler.py [iso-8859-1] Tue Dec 14 16:57:01 2010
@@ -132,20 +132,22 @@ class StorageMasterHandlerTests(NeoUnitT
conn = self._getConnection()
oid_list = [self.getOID(1), self.getOID(2)]
tid = self.getNextTID()
+ ttid = self.getNextTID()
handler = self.operation
- self.assertRaises(ProtocolError, handler.askLockInformation, conn, tid,
- oid_list)
+ self.assertRaises(ProtocolError, handler.askLockInformation, conn,
+ ttid, tid, oid_list)
def test_askLockInformation2(self):
""" Lock transaction """
self.app.tm = Mock({'__contains__': True})
conn = self._getConnection()
tid = self.getNextTID()
+ ttid = self.getNextTID()
oid_list = [self.getOID(1), self.getOID(2)]
- self.operation.askLockInformation(conn, tid, oid_list)
+ self.operation.askLockInformation(conn, ttid, tid, oid_list)
calls = self.app.tm.mockGetNamedCalls('lock')
self.assertEqual(len(calls), 1)
- calls[0].checkArgs(tid, oid_list)
+ calls[0].checkArgs(ttid, tid, oid_list)
self.checkAnswerInformationLocked(conn)
def test_notifyUnlockInformation1(self):
Modified: trunk/neo/tests/storage/testTransactions.py
==============================================================================
--- trunk/neo/tests/storage/testTransactions.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testTransactions.py [iso-8859-1] Tue Dec 14 16:57:01 2010
@@ -27,9 +27,13 @@ class TransactionTests(NeoUnitTestBase):
def testInit(self):
uuid = self.getNewUUID()
+ ttid = self.getNextTID()
tid = self.getNextTID()
- txn = Transaction(uuid, tid)
+ txn = Transaction(uuid, ttid)
self.assertEqual(txn.getUUID(), uuid)
+ self.assertEqual(txn.getTTID(), ttid)
+ self.assertEqual(txn.getTID(), None)
+ txn.setTID(tid)
self.assertEqual(txn.getTID(), tid)
self.assertEqual(txn.getObjectList(), [])
self.assertEqual(txn.getOIDList(), [])
@@ -122,38 +126,41 @@ class TransactionManagerTests(NeoUnitTes
def testSimpleCase(self):
""" One node, one transaction, not abort """
uuid = self.getNewUUID()
+ ttid = self.getNextTID()
tid, txn = self._getTransaction()
serial1, object1 = self._getObject(1)
serial2, object2 = self._getObject(2)
- self.manager.register(uuid,tid)
- self.manager.storeTransaction(tid, *txn)
- self.manager.storeObject(tid, serial1, *object1)
- self.manager.storeObject(tid, serial2, *object2)
- self.assertTrue(tid in self.manager)
- self.manager.lock(tid, txn[0])
+ self.manager.register(uuid, ttid)
+ self.manager.storeTransaction(ttid, *txn)
+ self.manager.storeObject(ttid, serial1, *object1)
+ self.manager.storeObject(ttid, serial2, *object2)
+ self.assertTrue(ttid in self.manager)
+ self.manager.lock(ttid, tid, txn[0])
self._checkTransactionStored(tid, [object1, object2], txn)
- self.manager.unlock(tid)
- self.assertFalse(tid in self.manager)
+ self.manager.unlock(ttid)
+ self.assertFalse(ttid in self.manager)
self._checkTransactionFinished(tid)
def testDelayed(self):
""" Two transactions, the first cause delaytion of the second """
uuid = self.getNewUUID()
+ ttid1 = self.getNextTID()
+ ttid2 = self.getNextTID()
tid1, txn1 = self._getTransaction()
tid2, txn2 = self._getTransaction()
serial, obj = self._getObject(1)
# first transaction lock the object
- self.manager.register(uuid, tid1)
- self.manager.storeTransaction(tid1, *txn1)
- self.assertTrue(tid1 in self.manager)
- self._storeTransactionObjects(tid1, txn1)
- self.manager.lock(tid1, txn1[0])
+ self.manager.register(uuid, ttid1)
+ self.manager.storeTransaction(ttid1, *txn1)
+ self.assertTrue(ttid1 in self.manager)
+ self._storeTransactionObjects(ttid1, txn1)
+ self.manager.lock(ttid1, tid1, txn1[0])
# the second is delayed
- self.manager.register(uuid, tid2)
- self.manager.storeTransaction(tid2, *txn2)
- self.assertTrue(tid2 in self.manager)
+ self.manager.register(uuid, ttid2)
+ self.manager.storeTransaction(ttid2, *txn2)
+ self.assertTrue(ttid2 in self.manager)
self.assertRaises(DelayedError, self.manager.storeObject,
- tid2, serial, *obj)
+ ttid2, serial, *obj)
def testResolvableConflict(self):
""" Try to store an object with the lastest revision """
@@ -173,50 +180,54 @@ class TransactionManagerTests(NeoUnitTes
uuid1 = self.getNewUUID()
uuid2 = self.getNewUUID()
self.assertNotEqual(uuid1, uuid2)
+ ttid1 = self.getNextTID()
+ ttid2 = self.getNextTID()
tid1, txn1 = self._getTransaction()
tid2, txn2 = self._getTransaction()
serial1, obj1 = self._getObject(1)
serial2, obj2 = self._getObject(2)
# first transaction lock objects
- self.manager.register(uuid1, tid1)
- self.manager.storeTransaction(tid1, *txn1)
- self.assertTrue(tid1 in self.manager)
- self.manager.storeObject(tid1, serial1, *obj1)
- self.manager.storeObject(tid1, serial1, *obj2)
- self.manager.lock(tid1, txn1[0])
+ self.manager.register(uuid1, ttid1)
+ self.manager.storeTransaction(ttid1, *txn1)
+ self.assertTrue(ttid1 in self.manager)
+ self.manager.storeObject(ttid1, serial1, *obj1)
+ self.manager.storeObject(ttid1, serial1, *obj2)
+ self.manager.lock(ttid1, tid1, txn1[0])
# second transaction is delayed
- self.manager.register(uuid2, tid2)
- self.manager.storeTransaction(tid2, *txn2)
- self.assertTrue(tid2 in self.manager)
+ self.manager.register(uuid2, ttid2)
+ self.manager.storeTransaction(ttid2, *txn2)
+ self.assertTrue(ttid2 in self.manager)
self.assertRaises(DelayedError, self.manager.storeObject,
- tid2, serial1, *obj1)
+ ttid2, serial1, *obj1)
self.assertRaises(DelayedError, self.manager.storeObject,
- tid2, serial2, *obj2)
+ ttid2, serial2, *obj2)
def testLockConflict(self):
""" Check lock conflict """
uuid1 = self.getNewUUID()
uuid2 = self.getNewUUID()
self.assertNotEqual(uuid1, uuid2)
+ ttid1 = self.getNextTID()
+ ttid2 = self.getNextTID()
tid1, txn1 = self._getTransaction()
tid2, txn2 = self._getTransaction()
serial1, obj1 = self._getObject(1)
serial2, obj2 = self._getObject(2)
# the second transaction lock objects
- self.manager.register(uuid2, tid2)
- self.manager.storeTransaction(tid2, *txn2)
- self.manager.storeObject(tid2, serial1, *obj1)
- self.manager.storeObject(tid2, serial2, *obj2)
- self.assertTrue(tid2 in self.manager)
- self.manager.lock(tid2, txn1[0])
+ self.manager.register(uuid2, ttid2)
+ self.manager.storeTransaction(ttid2, *txn2)
+ self.manager.storeObject(ttid2, serial1, *obj1)
+ self.manager.storeObject(ttid2, serial2, *obj2)
+ self.assertTrue(ttid2 in self.manager)
+ self.manager.lock(ttid2, tid2, txn1[0])
# the first get a delay, as nothing is committed yet
- self.manager.register(uuid1, tid1)
- self.manager.storeTransaction(tid1, *txn1)
- self.assertTrue(tid1 in self.manager)
+ self.manager.register(uuid1, ttid1)
+ self.manager.storeTransaction(ttid1, *txn1)
+ self.assertTrue(ttid1 in self.manager)
self.assertRaises(DelayedError, self.manager.storeObject,
- tid1, serial1, *obj1)
+ ttid1, serial1, *obj1)
self.assertRaises(DelayedError, self.manager.storeObject,
- tid1, serial2, *obj2)
+ ttid1, serial2, *obj2)
def testAbortUnlocked(self):
""" Abort a non-locked transaction """
@@ -236,15 +247,16 @@ class TransactionManagerTests(NeoUnitTes
def testAbortLockedDoNothing(self):
""" Try to abort a locked transaction """
uuid = self.getNewUUID()
+ ttid = self.getNextTID()
tid, txn = self._getTransaction()
- self.manager.register(uuid, tid)
- self.manager.storeTransaction(tid, *txn)
- self._storeTransactionObjects(tid, txn)
+ self.manager.register(uuid, ttid)
+ self.manager.storeTransaction(ttid, *txn)
+ self._storeTransactionObjects(ttid, txn)
# lock transaction
- self.manager.lock(tid, txn[0])
- self.assertTrue(tid in self.manager)
- self.manager.abort(tid)
- self.assertTrue(tid in self.manager)
+ self.manager.lock(ttid, tid, txn[0])
+ self.assertTrue(ttid in self.manager)
+ self.manager.abort(ttid)
+ self.assertTrue(ttid in self.manager)
for oid in txn[0]:
self.assertTrue(self.manager.loadLocked(oid))
self._checkQueuedEventExecuted(number=0)
@@ -254,39 +266,43 @@ class TransactionManagerTests(NeoUnitTes
uuid1 = self.getNewUUID()
uuid2 = self.getNewUUID()
self.assertNotEqual(uuid1, uuid2)
+ ttid1 = self.getNextTID()
+ ttid2 = self.getNextTID()
+ ttid3 = self.getNextTID()
tid1, txn1 = self._getTransaction()
tid2, txn2 = self._getTransaction()
tid3, txn3 = self._getTransaction()
- self.manager.register(uuid1, tid1)
- self.manager.register(uuid2, tid2)
- self.manager.register(uuid2, tid3)
- self.manager.storeTransaction(tid1, *txn1)
+ self.manager.register(uuid1, ttid1)
+ self.manager.register(uuid2, ttid2)
+ self.manager.register(uuid2, ttid3)
+ self.manager.storeTransaction(ttid1, *txn1)
# node 2 owns tid2 & tid3 and lock tid2 only
- self.manager.storeTransaction(tid2, *txn2)
- self.manager.storeTransaction(tid3, *txn3)
- self._storeTransactionObjects(tid2, txn2)
- self.manager.lock(tid2, txn2[0])
- self.assertTrue(tid1 in self.manager)
- self.assertTrue(tid2 in self.manager)
- self.assertTrue(tid3 in self.manager)
+ self.manager.storeTransaction(ttid2, *txn2)
+ self.manager.storeTransaction(ttid3, *txn3)
+ self._storeTransactionObjects(ttid2, txn2)
+ self.manager.lock(ttid2, tid2, txn2[0])
+ self.assertTrue(ttid1 in self.manager)
+ self.assertTrue(ttid2 in self.manager)
+ self.assertTrue(ttid3 in self.manager)
self.manager.abortFor(uuid2)
# only tid3 is aborted
- self.assertTrue(tid1 in self.manager)
- self.assertTrue(tid2 in self.manager)
- self.assertFalse(tid3 in self.manager)
+ self.assertTrue(ttid1 in self.manager)
+ self.assertTrue(ttid2 in self.manager)
+ self.assertFalse(ttid3 in self.manager)
self._checkQueuedEventExecuted(number=1)
def testReset(self):
""" Reset the manager """
uuid = self.getNewUUID()
tid, txn = self._getTransaction()
- self.manager.register(uuid, tid)
- self.manager.storeTransaction(tid, *txn)
- self._storeTransactionObjects(tid, txn)
- self.manager.lock(tid, txn[0])
- self.assertTrue(tid in self.manager)
+ ttid = self.getNextTID()
+ self.manager.register(uuid, ttid)
+ self.manager.storeTransaction(ttid, *txn)
+ self._storeTransactionObjects(ttid, txn)
+ self.manager.lock(ttid, tid, txn[0])
+ self.assertTrue(ttid in self.manager)
self.manager.reset()
- self.assertFalse(tid in self.manager)
+ self.assertFalse(ttid in self.manager)
for oid in txn[0]:
self.assertFalse(self.manager.loadLocked(oid))
Modified: trunk/neo/tests/testProtocol.py
==============================================================================
--- trunk/neo/tests/testProtocol.py [iso-8859-1] (original)
+++ trunk/neo/tests/testProtocol.py [iso-8859-1] Tue Dec 14 16:57:01 2010
@@ -278,18 +278,22 @@ class ProtocolTests(NeoUnitTestBase):
self.assertEqual(p_oid_list, oid_list)
def test_37_answerTransactionFinished(self):
+ ttid = self.getNextTID()
tid = self.getNextTID()
- p = Packets.AnswerTransactionFinished(tid)
- ptid = p.decode()[0]
+ p = Packets.AnswerTransactionFinished(ttid, tid)
+ pttid, ptid = p.decode()
+ self.assertEqual(pttid, ttid)
self.assertEqual(ptid, tid)
def test_38_askLockInformation(self):
oid1 = self.getNextTID()
oid2 = self.getNextTID()
oid_list = [oid1, oid2]
+ ttid = self.getNextTID()
tid = self.getNextTID()
- p = Packets.AskLockInformation(tid, oid_list)
- ptid, p_oid_list = p.decode()
+ p = Packets.AskLockInformation(ttid, tid, oid_list)
+ pttid, ptid, p_oid_list = p.decode()
+ self.assertEqual(pttid, ttid)
self.assertEqual(ptid, tid)
self.assertEqual(oid_list, p_oid_list)
More information about the Neo-report
mailing list