[Neo-report] r2611 gregory - in /trunk/neo: ./ client/ storage/ storage/handlers/
nobody at svn.erp5.org
nobody at svn.erp5.org
Tue Jan 11 16:48:49 CET 2011
Author: gregory
Date: Tue Jan 11 16:48:49 2011
New Revision: 2611
Log:
Rename tid to ttid to clarify the meaning.
Modified:
trunk/neo/client/app.py
trunk/neo/handler.py
trunk/neo/storage/handlers/client.py
trunk/neo/storage/handlers/master.py
trunk/neo/storage/transactions.py
Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Tue Jan 11 16:48:49 2011
@@ -747,14 +747,14 @@ class Application(object):
self._waitAnyMessage(False)
- def onStoreTimeout(self, conn, msg_id, tid, oid):
+ def onStoreTimeout(self, conn, msg_id, ttid, oid):
# NOTE: this method is called from poll thread, don't use
# local_var !
# Stop expecting the timed-out store request.
queue = self.dispatcher.forget(conn, msg_id)
# Ask the storage if someone locks the object.
# Shorten timeout to react earlier to an unresponding storage.
- conn.ask(Packets.AskHasLock(tid, oid), timeout=5, queue=queue)
+ conn.ask(Packets.AskHasLock(ttid, oid), timeout=5, queue=queue)
return True
@profiler_decorator
Modified: trunk/neo/handler.py
==============================================================================
--- trunk/neo/handler.py [iso-8859-1] (original)
+++ trunk/neo/handler.py [iso-8859-1] Tue Jan 11 16:48:49 2011
@@ -202,7 +202,7 @@ class EventHandler(object):
def askBeginTransaction(self, conn, tid):
raise UnexpectedPacketError
- def answerBeginTransaction(self, conn, tid):
+ def answerBeginTransaction(self, conn, ttid):
raise UnexpectedPacketError
def askNewOIDs(self, conn, num_oids):
@@ -211,7 +211,7 @@ class EventHandler(object):
def answerNewOIDs(self, conn, num_oids):
raise UnexpectedPacketError
- def askFinishTransaction(self, conn, tid, oid_list):
+ def askFinishTransaction(self, conn, ttid, oid_list):
raise UnexpectedPacketError
def answerTransactionFinished(self, conn, ttid, tid):
@@ -226,27 +226,27 @@ class EventHandler(object):
def invalidateObjects(self, conn, tid, oid_list):
raise UnexpectedPacketError
- def notifyUnlockInformation(self, conn, tid):
+ def notifyUnlockInformation(self, conn, ttid):
raise UnexpectedPacketError
def askStoreObject(self, conn, oid, serial,
- compression, checksum, data, data_serial, tid, unlock):
+ compression, checksum, data, data_serial, ttid, unlock):
raise UnexpectedPacketError
def answerStoreObject(self, conn, conflicting, oid, serial):
raise UnexpectedPacketError
- def abortTransaction(self, conn, tid):
+ def abortTransaction(self, conn, ttid):
raise UnexpectedPacketError
- def askStoreTransaction(self, conn, tid, user, desc,
+ def askStoreTransaction(self, conn, ttid, user, desc,
ext, oid_list):
raise UnexpectedPacketError
- def answerStoreTransaction(self, conn, tid):
+ def answerStoreTransaction(self, conn, ttid):
raise UnexpectedPacketError
- def askObject(self, conn, oid, serial, tid):
+ def askObject(self, conn, oid, serial, ttid):
raise UnexpectedPacketError
def answerObject(self, conn, oid, serial_start,
@@ -327,13 +327,13 @@ class EventHandler(object):
def notifyReplicationDone(self, conn, offset):
raise UnexpectedPacketError
- def askObjectUndoSerial(self, conn, tid, ltid, undone_tid, oid_list):
+ def askObjectUndoSerial(self, conn, ttid, ltid, undone_tid, oid_list):
raise UnexpectedPacketError
def answerObjectUndoSerial(self, conn, object_tid_dict):
raise UnexpectedPacketError
- def askHasLock(self, conn, tid, oid):
+ def askHasLock(self, conn, ttid, oid):
raise UnexpectedPacketError
def answerHasLock(self, conn, oid, status):
@@ -375,7 +375,7 @@ class EventHandler(object):
def answerLastTransaction(self, conn, tid):
raise UnexpectedPacketError
- def askCheckCurrentSerial(self, conn, tid, serial, oid):
+ def askCheckCurrentSerial(self, conn, ttid, serial, oid):
raise UnexpectedPacketError
answerCheckCurrentSerial = answerStoreObject
Modified: trunk/neo/storage/handlers/client.py
==============================================================================
--- trunk/neo/storage/handlers/client.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/client.py [iso-8859-1] Tue Jan 11 16:48:49 2011
@@ -30,8 +30,8 @@ SLOW_STORE = 2
class ClientOperationHandler(BaseClientAndStorageOperationHandler):
- def _askObject(self, oid, serial, tid):
- return self.app.dm.getObject(oid, serial, tid)
+ def _askObject(self, oid, serial, ttid):
+ return self.app.dm.getObject(oid, serial, ttid)
def connectionLost(self, conn, new_state):
uuid = conn.getUUID()
@@ -39,31 +39,31 @@ class ClientOperationHandler(BaseClientA
assert node is not None, conn
self.app.nm.remove(node)
- def abortTransaction(self, conn, tid):
- self.app.tm.abort(tid)
+ def abortTransaction(self, conn, ttid):
+ self.app.tm.abort(ttid)
- def askStoreTransaction(self, conn, tid, user, desc, ext, oid_list):
- self.app.tm.register(conn.getUUID(), tid)
- self.app.tm.storeTransaction(tid, oid_list, user, desc, ext, False)
- conn.answer(Packets.AnswerStoreTransaction(tid))
+ def askStoreTransaction(self, conn, ttid, user, desc, ext, oid_list):
+ self.app.tm.register(conn.getUUID(), ttid)
+ self.app.tm.storeTransaction(ttid, oid_list, user, desc, ext, False)
+ conn.answer(Packets.AnswerStoreTransaction(ttid))
def _askStoreObject(self, conn, oid, serial, compression, checksum, data,
- data_serial, tid, unlock, request_time):
- if tid not in self.app.tm:
+ data_serial, ttid, unlock, request_time):
+ if ttid not in self.app.tm:
# transaction was aborted, cancel this event
neo.logging.info('Forget store of %s:%s by %s delayed by %s',
- dump(oid), dump(serial), dump(tid),
+ dump(oid), dump(serial), dump(ttid),
dump(self.app.tm.getLockingTID(oid)))
# send an answer as the client side is waiting for it
conn.answer(Packets.AnswerStoreObject(0, oid, serial))
return
try:
- self.app.tm.storeObject(tid, serial, oid, compression,
+ self.app.tm.storeObject(ttid, serial, oid, compression,
checksum, data, data_serial, unlock)
except ConflictError, err:
# resolvable or not
- tid_or_serial = err.getTID()
- conn.answer(Packets.AnswerStoreObject(1, oid, tid_or_serial))
+ ttid_or_serial = err.getTID()
+ conn.answer(Packets.AnswerStoreObject(1, oid, ttid_or_serial))
except DelayedError:
# locked by a previous transaction, retry later
# If we are unlocking, we want queueEvent to raise
@@ -71,8 +71,8 @@ class ClientOperationHandler(BaseClientA
# response.
try:
self.app.queueEvent(self._askStoreObject, conn, (oid, serial,
- compression, checksum, data, data_serial, tid,
- unlock, request_time), key=(oid, tid),
+ compression, checksum, data, data_serial, ttid,
+ unlock, request_time), key=(oid, ttid),
raise_on_duplicate=unlock)
except AlreadyPendingError:
conn.answer(Errors.AlreadyPending(dump(oid)))
@@ -84,16 +84,16 @@ class ClientOperationHandler(BaseClientA
conn.answer(Packets.AnswerStoreObject(0, oid, serial))
def askStoreObject(self, conn, oid, serial,
- compression, checksum, data, data_serial, tid, unlock):
+ compression, checksum, data, data_serial, ttid, unlock):
# register the transaction
- self.app.tm.register(conn.getUUID(), tid)
+ self.app.tm.register(conn.getUUID(), ttid)
if data_serial is not None:
assert data == '', repr(data)
# Change data to None here, to do it only once, even if store gets
# delayed.
data = None
self._askStoreObject(conn, oid, serial, compression, checksum, data,
- data_serial, tid, unlock, time.time())
+ data_serial, ttid, unlock, time.time())
def askTIDsFrom(self, conn, min_tid, max_tid, length, partition_list):
app = self.app
@@ -122,14 +122,14 @@ class ClientOperationHandler(BaseClientA
app.pt.getPartitions(), partition_list)
conn.answer(Packets.AnswerTIDs(tid_list))
- def askObjectUndoSerial(self, conn, tid, ltid, undone_tid, oid_list):
+ def askObjectUndoSerial(self, conn, ttid, ltid, undone_tid, oid_list):
app = self.app
findUndoTID = app.dm.findUndoTID
getObjectFromTransaction = app.tm.getObjectFromTransaction
object_tid_dict = {}
for oid in oid_list:
- current_serial, undo_serial, is_current = findUndoTID(oid, tid,
- ltid, undone_tid, getObjectFromTransaction(tid, oid))
+ current_serial, undo_serial, is_current = findUndoTID(oid, ttid,
+ ltid, undone_tid, getObjectFromTransaction(ttid, oid))
if current_serial is None:
p = Errors.OidNotFound(dump(oid))
break
@@ -138,12 +138,12 @@ class ClientOperationHandler(BaseClientA
p = Packets.AnswerObjectUndoSerial(object_tid_dict)
conn.answer(p)
- def askHasLock(self, conn, tid, oid):
+ def askHasLock(self, conn, ttid, oid):
locking_tid = self.app.tm.getLockingTID(oid)
- neo.logging.info('%r check lock of %r:%r', conn, dump(tid), dump(oid))
+ neo.logging.info('%r check lock of %r:%r', conn, dump(ttid), dump(oid))
if locking_tid is None:
state = LockState.NOT_LOCKED
- elif locking_tid is tid:
+ elif locking_tid is ttid:
state = LockState.GRANTED
else:
state = LockState.GRANTED_TO_OTHER
@@ -161,20 +161,20 @@ class ClientOperationHandler(BaseClientA
p = Packets.AnswerObjectHistory(oid, history_list)
conn.answer(p)
- def askCheckCurrentSerial(self, conn, tid, serial, oid):
- self._askCheckCurrentSerial(conn, tid, serial, oid, time.time())
+ def askCheckCurrentSerial(self, conn, ttid, serial, oid):
+ self._askCheckCurrentSerial(conn, ttid, serial, oid, time.time())
- def _askCheckCurrentSerial(self, conn, tid, serial, oid, request_time):
- if tid not in self.app.tm:
+ def _askCheckCurrentSerial(self, conn, ttid, serial, oid, request_time):
+ if ttid not in self.app.tm:
# transaction was aborted, cancel this event
neo.logging.info('Forget serial check of %s:%s by %s delayed by '
- '%s', dump(oid), dump(serial), dump(tid),
+ '%s', dump(oid), dump(serial), dump(ttid),
dump(self.app.tm.getLockingTID(oid)))
# send an answer as the client side is waiting for it
conn.answer(Packets.AnswerStoreObject(0, oid, serial))
return
try:
- self.app.tm.checkCurrentSerial(tid, serial, oid)
+ self.app.tm.checkCurrentSerial(ttid, serial, oid)
except ConflictError, err:
# resolvable or not
conn.answer(Packets.AnswerCheckCurrentSerial(1, oid,
@@ -182,8 +182,8 @@ class ClientOperationHandler(BaseClientA
except DelayedError:
# locked by a previous transaction, retry later
try:
- self.app.queueEvent(self._askCheckCurrentSerial, conn, (tid,
- serial, oid, request_time), key=(oid, tid))
+ self.app.queueEvent(self._askCheckCurrentSerial, conn, (ttid,
+ serial, oid, request_time), key=(oid, ttid))
except AlreadyPendingError:
conn.answer(Errors.AlreadyPending(dump(oid)))
else:
Modified: trunk/neo/storage/handlers/master.py
==============================================================================
--- trunk/neo/storage/handlers/master.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/master.py [iso-8859-1] Tue Jan 11 16:48:49 2011
@@ -60,11 +60,11 @@ class MasterOperationHandler(BaseMasterH
if not conn.isClosed():
conn.answer(Packets.AnswerInformationLocked(tid))
- def notifyUnlockInformation(self, conn, tid):
- if not tid in self.app.tm:
+ def notifyUnlockInformation(self, conn, ttid):
+ if not ttid in self.app.tm:
raise ProtocolError('Unknown transaction')
# TODO: send an answer
- self.app.tm.unlock(tid)
+ self.app.tm.unlock(ttid)
def askPack(self, conn, tid):
app = self.app
Modified: trunk/neo/storage/transactions.py
==============================================================================
--- trunk/neo/storage/transactions.py [iso-8859-1] (original)
+++ trunk/neo/storage/transactions.py [iso-8859-1] Tue Jan 11 16:48:49 2011
@@ -135,29 +135,29 @@ class TransactionManager(object):
self._load_lock_dict = {}
self._uuid_dict = {}
- def __contains__(self, tid):
+ def __contains__(self, ttid):
"""
Returns True if the TID is known by the manager
"""
- return tid in self._transaction_dict
+ return ttid in self._transaction_dict
- def register(self, uuid, tid):
+ def register(self, uuid, ttid):
"""
Register a transaction, it may be already registered
"""
- transaction = self._transaction_dict.get(tid, None)
+ transaction = self._transaction_dict.get(ttid, None)
if transaction is None:
- transaction = Transaction(uuid, tid)
+ transaction = Transaction(uuid, ttid)
self._uuid_dict.setdefault(uuid, set()).add(transaction)
- self._transaction_dict[tid] = transaction
+ self._transaction_dict[ttid] = transaction
return transaction
- def getObjectFromTransaction(self, tid, oid):
+ def getObjectFromTransaction(self, ttid, oid):
"""
Return object data for given running transaction.
Return None if not found.
"""
- result = self._transaction_dict.get(tid)
+ result = self._transaction_dict.get(ttid)
if result is not None:
result = result.getObject(oid)
return result
@@ -206,18 +206,18 @@ class TransactionManager(object):
self._app.dm.finishTransaction(self.getTIDFromTTID(ttid))
self.abort(ttid, even_if_locked=True)
- def storeTransaction(self, tid, oid_list, user, desc, ext, packed):
+ def storeTransaction(self, ttid, oid_list, user, desc, ext, packed):
"""
Store transaction information received from client node
"""
- assert tid in self, "Transaction not registered"
- transaction = self._transaction_dict[tid]
+ assert ttid in self, "Transaction not registered"
+ transaction = self._transaction_dict[ttid]
transaction.prepare(oid_list, user, desc, ext, packed)
def getLockingTID(self, oid):
return self._store_lock_dict.get(oid)
- def lockObject(self, tid, serial, oid, unlock=False):
+ def lockObject(self, ttid, serial, oid, unlock=False):
"""
Take a write lock on given object, checking that "serial" is
current.
@@ -227,30 +227,30 @@ class TransactionManager(object):
"""
# check if the object if locked
locking_tid = self._store_lock_dict.get(oid)
- if locking_tid == tid and unlock:
+ if locking_tid == ttid and unlock:
neo.logging.info('Deadlock resolution on %r:%r', dump(oid),
- dump(tid))
+ dump(ttid))
# A duplicate store means client is resolving a deadlock, so
# drop the lock it held on this object, and drop object data for
# consistency.
del self._store_lock_dict[oid]
- self._transaction_dict[tid].delObject(oid)
+ self._transaction_dict[ttid].delObject(oid)
# Give a chance to pending events to take that lock now.
self._app.executeQueuedEvents()
# Attemp to acquire lock again.
locking_tid = self._store_lock_dict.get(oid)
- if locking_tid in (None, tid):
+ if locking_tid in (None, ttid):
# check if this is generated from the latest revision.
- if locking_tid == tid:
+ if locking_tid == ttid:
# If previous store was an undo, next store must be based on
# undo target.
_, _, _, _, previous_serial = self._transaction_dict[
- tid].getObject(oid)
+ ttid].getObject(oid)
if previous_serial is None:
# XXX: use some special serial when previous store was not
# an undo ? Maybe it should just not happen.
neo.logging.info('Transaction %s storing %s more than '
- 'once', dump(tid), dump(oid))
+ 'once', dump(ttid), dump(oid))
else:
previous_serial = None
if previous_serial is None:
@@ -259,56 +259,56 @@ class TransactionManager(object):
previous_serial = history_list[0][0]
if previous_serial is not None and previous_serial != serial:
neo.logging.info('Resolvable conflict on %r:%r', dump(oid),
- dump(tid))
+ dump(ttid))
raise ConflictError(previous_serial)
- neo.logging.info('Transaction %s storing %s', dump(tid), dump(oid))
- self._store_lock_dict[oid] = tid
- elif locking_tid > tid:
+ neo.logging.info('Transaction %s storing %s', dump(ttid), dump(oid))
+ self._store_lock_dict[oid] = ttid
+ elif locking_tid > ttid:
# We have a smaller TID than locking transaction, so we are older:
# enter waiting queue so we are handled when lock gets released.
neo.logging.info('Store delayed for %r:%r by %r', dump(oid),
- dump(tid), dump(locking_tid))
+ dump(ttid), dump(locking_tid))
raise DelayedError
else:
- # We have a bigger TID than locking transaction, so we are
+ # We have a bigger TTID than locking transaction, so we are
# younger: this is a possible deadlock case, as we might already
# hold locks that older transaction is waiting upon. Make client
# release locks & reacquire them by notifying it of the possible
# deadlock.
neo.logging.info('Possible deadlock on %r:%r with %r',
- dump(oid), dump(tid), dump(locking_tid))
+ dump(oid), dump(ttid), dump(locking_tid))
raise ConflictError(ZERO_TID)
- def checkCurrentSerial(self, tid, serial, oid):
- self.lockObject(tid, serial, oid, unlock=True)
- assert tid in self, "Transaction not registered"
- transaction = self._transaction_dict[tid]
+ def checkCurrentSerial(self, ttid, serial, oid):
+ self.lockObject(ttid, serial, oid, unlock=True)
+ assert ttid in self, "Transaction not registered"
+ transaction = self._transaction_dict[ttid]
transaction.addCheckedObject(oid)
- def storeObject(self, tid, serial, oid, compression, checksum, data,
+ def storeObject(self, ttid, serial, oid, compression, checksum, data,
value_serial, unlock=False):
"""
Store an object received from client node
"""
- self.lockObject(tid, serial, oid, unlock=unlock)
+ self.lockObject(ttid, serial, oid, unlock=unlock)
# store object
- assert tid in self, "Transaction not registered"
- transaction = self._transaction_dict[tid]
+ assert ttid in self, "Transaction not registered"
+ transaction = self._transaction_dict[ttid]
transaction.addObject(oid, compression, checksum, data, value_serial)
- def abort(self, tid, even_if_locked=False):
+ def abort(self, ttid, even_if_locked=False):
"""
Abort a transaction
Releases locks held on all transaction objects, deletes Transaction
instance, and executed queued events.
Note: does not alter persistent content.
"""
- if tid not in self._transaction_dict:
+ if ttid not in self._transaction_dict:
# the tid may be unknown as the transaction is aborted on every node
# of the partition, even if no data was received (eg. conflict on
# another node)
return
- transaction = self._transaction_dict[tid]
+ transaction = self._transaction_dict[ttid]
has_load_lock = transaction.isLocked()
# if the transaction is locked, ensure we can drop it
if not even_if_locked and has_load_lock:
@@ -316,23 +316,23 @@ class TransactionManager(object):
# unlock any object
for oid in transaction.getLockedOIDList():
if has_load_lock:
- lock_tid = self._load_lock_dict.pop(oid, None)
- assert lock_tid in (tid, None), 'Transaction %s tried to ' \
+ lock_ttid = self._load_lock_dict.pop(oid, None)
+ assert lock_ttid in (ttid, None), 'Transaction %s tried to ' \
'release the lock on oid %s, but it was held by %s' % (
- dump(tid), dump(oid), dump(lock_tid))
+ dump(ttid), dump(oid), dump(lock_tid))
try:
del self._store_lock_dict[oid]
except KeyError:
# all locks might not have been acquiredwhen aborting
neo.logging.warning('%s write lock was not held by %s',
- dump(oid), dump(tid))
+ dump(oid), dump(ttid))
# remove the transaction
uuid = transaction.getUUID()
self._uuid_dict[uuid].discard(transaction)
# clean node index if there is no more current transactions
if not self._uuid_dict[uuid]:
del self._uuid_dict[uuid]
- del self._transaction_dict[tid]
+ del self._transaction_dict[ttid]
# some locks were released, some pending locks may now succeed
self._app.executeQueuedEvents()
@@ -356,11 +356,11 @@ class TransactionManager(object):
for txn in self._transaction_dict.values():
neo.logging.info(' %r', txn)
neo.logging.info(' Read locks:')
- for oid, tid in self._load_lock_dict.items():
- neo.logging.info(' %r by %r', dump(oid), dump(tid))
+ for oid, ttid in self._load_lock_dict.items():
+ neo.logging.info(' %r by %r', dump(oid), dump(ttid))
neo.logging.info(' Write locks:')
- for oid, tid in self._store_lock_dict.items():
- neo.logging.info(' %r by %r', dump(oid), dump(tid))
+ for oid, ttid in self._store_lock_dict.items():
+ neo.logging.info(' %r by %r', dump(oid), dump(ttid))
def updateObjectDataForPack(self, oid, orig_serial, new_serial,
getObjectData):
More information about the Neo-report
mailing list