[Neo-report] r2102 gregory - in /trunk/neo: ./ master/handlers/ storage/ storage/handlers/...
nobody at svn.erp5.org
nobody at svn.erp5.org
Thu May 13 14:04:39 CEST 2010
Author: gregory
Date: Thu May 13 14:04:39 2010
New Revision: 2102
Log:
Storage node check if all objects are stored before set write locks.
Modified:
trunk/neo/handler.py
trunk/neo/master/handlers/client.py
trunk/neo/protocol.py
trunk/neo/pt.py
trunk/neo/storage/handlers/master.py
trunk/neo/storage/transactions.py
trunk/neo/tests/storage/testMasterHandler.py
trunk/neo/tests/storage/testTransactions.py
trunk/neo/tests/testProtocol.py
Modified: trunk/neo/handler.py
==============================================================================
--- trunk/neo/handler.py [iso-8859-1] (original)
+++ trunk/neo/handler.py [iso-8859-1] Thu May 13 14:04:39 2010
@@ -217,7 +217,7 @@
def answerTransactionFinished(self, conn, tid):
raise UnexpectedPacketError
- def askLockInformation(self, conn, tid):
+ def askLockInformation(self, conn, tid, oid_list):
raise UnexpectedPacketError
def answerInformationLocked(self, conn, tid):
Modified: trunk/neo/master/handlers/client.py
==============================================================================
--- trunk/neo/master/handlers/client.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/client.py [iso-8859-1] Thu May 13 14:04:39 2010
@@ -75,7 +75,7 @@
# Request locking data.
# build a new set as we may not send the message to all nodes as some
# might be not reachable at that time
- p = Packets.AskLockInformation(tid)
+ p = Packets.AskLockInformation(tid, oid_list)
used_uuid_set = set()
for node in self.app.nm.getIdentifiedList(pool_set=uuid_set):
node.ask(p, timeout=60)
Modified: trunk/neo/protocol.py
==============================================================================
--- trunk/neo/protocol.py [iso-8859-1] (original)
+++ trunk/neo/protocol.py [iso-8859-1] Thu May 13 14:04:39 2010
@@ -783,12 +783,28 @@
"""
Lock information on a transaction. PM -> S.
"""
- def _encode(self, tid):
- return _encodeTID(tid)
-
- def _decode(self, body):
- (tid, ) = unpack('8s', body)
- return (_decodeTID(tid), )
+ # XXX: Identical to InvalidateObjects and AskFinishTransaction
+ _header_format = '!8sL'
+ _list_entry_format = '8s'
+ _list_entry_len = calcsize(_list_entry_format)
+
+ def _encode(self, tid, oid_list):
+ body = [pack(self._header_format, tid, len(oid_list))]
+ body.extend(oid_list)
+ return ''.join(body)
+
+ def _decode(self, body):
+ offset = self._header_len
+ (tid, n) = unpack(self._header_format, body[:offset])
+ oid_list = []
+ list_entry_format = self._list_entry_format
+ list_entry_len = self._list_entry_len
+ for _ in xrange(n):
+ next_offset = offset + list_entry_len
+ oid = unpack(list_entry_format, body[offset:next_offset])[0]
+ offset = next_offset
+ oid_list.append(oid)
+ return (tid, oid_list)
class AnswerInformationLocked(Packet):
"""
Modified: trunk/neo/pt.py
==============================================================================
--- trunk/neo/pt.py [iso-8859-1] (original)
+++ trunk/neo/pt.py [iso-8859-1] Thu May 13 14:04:39 2010
@@ -142,6 +142,13 @@
def getCellListForOID(self, oid, readable=False, writable=False):
return self.getCellList(self._getPartitionFromIndex(u64(oid)),
readable, writable)
+
+ def isAssigned(self, oid, uuid):
+ """ Check if the oid is assigned to the given node """
+ for cell in self.partition_list[u64(oid) % self.np]:
+ if cell.getUUID() == uuid:
+ return True
+ return False
def _getPartitionFromIndex(self, index):
return index % self.np
Modified: trunk/neo/storage/handlers/master.py
==============================================================================
--- trunk/neo/storage/handlers/master.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/master.py [iso-8859-1] Thu May 13 14:04:39 2010
@@ -16,7 +16,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from neo import logging
-
+from neo.util import dump
from neo.protocol import CellStates, Packets, ProtocolError
from neo.storage.handlers import BaseMasterHandler
@@ -52,10 +52,10 @@
elif state == CellStates.OUT_OF_DATE:
app.replicator.addPartition(offset)
- def askLockInformation(self, conn, tid):
+ def askLockInformation(self, conn, tid, oid_list):
if not tid in self.app.tm:
raise ProtocolError('Unknown transaction')
- self.app.tm.lock(tid)
+ self.app.tm.lock(tid, oid_list)
conn.answer(Packets.AnswerInformationLocked(tid))
def notifyUnlockInformation(self, conn, tid):
Modified: trunk/neo/storage/transactions.py
==============================================================================
--- trunk/neo/storage/transactions.py [iso-8859-1] (original)
+++ trunk/neo/storage/transactions.py [iso-8859-1] Thu May 13 14:04:39 2010
@@ -154,7 +154,7 @@
self._load_lock_dict.clear()
self._uuid_dict.clear()
- def lock(self, tid):
+ def lock(self, tid, oid_list):
"""
Lock a transaction
"""
@@ -163,6 +163,12 @@
transaction.lock()
for oid in transaction.getOIDList():
self._load_lock_dict[oid] = tid
+ # check every object that should be locked
+ uuid = transaction.getUUID()
+ is_assigned = self._app.pt.isAssigned
+ for oid in oid_list:
+ if is_assigned(oid, uuid) and self._load_lock_dict.get(oid) != tid:
+ raise ValueError, 'Some locks are not held'
object_list = transaction.getObjectList()
# txn_info is None is the transaction information is not stored on
# this storage.
Modified: trunk/neo/tests/storage/testMasterHandler.py
==============================================================================
--- trunk/neo/tests/storage/testMasterHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testMasterHandler.py [iso-8859-1] Thu May 13 14:04:39 2010
@@ -132,19 +132,22 @@
""" Unknown transaction """
self.app.tm = Mock({'__contains__': False})
conn = self._getConnection()
+ oid_list = [self.getOID(1), self.getOID(2)]
tid = self.getNextTID()
handler = self.operation
- self.assertRaises(ProtocolError, handler.askLockInformation, conn, tid)
+ self.assertRaises(ProtocolError, handler.askLockInformation, conn, tid,
+ oid_list)
def test_askLockInformation2(self):
""" Lock transaction """
self.app.tm = Mock({'__contains__': True})
conn = self._getConnection()
tid = self.getNextTID()
- self.operation.askLockInformation(conn, tid)
+ oid_list = [self.getOID(1), self.getOID(2)]
+ self.operation.askLockInformation(conn, tid, oid_list)
calls = self.app.tm.mockGetNamedCalls('lock')
self.assertEqual(len(calls), 1)
- calls[0].checkArgs(tid)
+ calls[0].checkArgs(tid, oid_list)
self.checkAnswerInformationLocked(conn)
def test_notifyUnlockInformation1(self):
@@ -153,7 +156,7 @@
conn = self._getConnection()
tid = self.getNextTID()
handler = self.operation
- self.assertRaises(ProtocolError, handler.notifyUnlockInformation,
+ self.assertRaises(ProtocolError, handler.notifyUnlockInformation,
conn, tid)
def test_notifyUnlockInformation2(self):
Modified: trunk/neo/tests/storage/testTransactions.py
==============================================================================
--- trunk/neo/tests/storage/testTransactions.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testTransactions.py [iso-8859-1] Thu May 13 14:04:39 2010
@@ -78,6 +78,7 @@
self.app = Mock()
# no history
self.app.dm = Mock({'getObjectHistory': []})
+ self.app.pt = Mock({'isAssigned': True})
self.manager = TransactionManager(self.app)
self.ltid = None
@@ -85,6 +86,11 @@
tid = self.getNextTID(self.ltid)
oid_list = [self.getOID(1), self.getOID(2)]
return (tid, (oid_list, 'USER', 'DESC', 'EXT', False))
+
+ def _storeTransactionObjects(self, tid, txn):
+ for i, oid in enumerate(txn[0]):
+ self.manager.storeObject(self.getNewUUID(), tid, None,
+ oid, 1, str(i), '0' + str(i), None)
def _getObject(self, value):
oid = self.getOID(value)
@@ -115,7 +121,7 @@
self.manager.storeObject(uuid, tid, serial1, *object1)
self.manager.storeObject(uuid, tid, serial2, *object2)
self.assertTrue(tid in self.manager)
- self.manager.lock(tid)
+ self.manager.lock(tid, txn[0])
self._checkTransactionStored(tid, [object1, object2], txn)
self.manager.unlock(tid)
self.assertFalse(tid in self.manager)
@@ -130,8 +136,8 @@
# first transaction lock the object
self.manager.storeTransaction(uuid, tid1, *txn1)
self.assertTrue(tid1 in self.manager)
- self.manager.storeObject(uuid, tid1, serial, *obj)
- self.manager.lock(tid1)
+ self._storeTransactionObjects(tid1, txn1)
+ self.manager.lock(tid1, txn1[0])
# the second is delayed
self.manager.storeTransaction(uuid, tid2, *txn2)
self.assertTrue(tid2 in self.manager)
@@ -148,7 +154,8 @@
self.manager.storeTransaction(uuid, tid2, *txn2)
self.manager.storeObject(uuid, tid2, serial, *obj)
self.assertTrue(tid2 in self.manager)
- self.manager.lock(tid2)
+ self._storeTransactionObjects(tid2, txn2)
+ self.manager.lock(tid2, txn2[0])
# the previous it's not using the latest version
self.manager.storeTransaction(uuid, tid1, *txn1)
self.assertTrue(tid1 in self.manager)
@@ -167,8 +174,8 @@
self.assertRaises(ConflictError, self.manager.storeObject,
uuid, tid, serial, *obj)
- def testConflictWithTwoNodes(self):
- """ Ensure conflict/delaytion is working with different nodes"""
+ def testLockDelayed(self):
+ """ Check lock delaytion"""
uuid1 = self.getNewUUID()
uuid2 = self.getNewUUID()
self.assertNotEqual(uuid1, uuid2)
@@ -176,25 +183,41 @@
tid2, txn2 = self._getTransaction()
serial1, obj1 = self._getObject(1)
serial2, obj2 = self._getObject(2)
- # first transaction lock the object
+ # first transaction lock objects
self.manager.storeTransaction(uuid1, tid1, *txn1)
self.assertTrue(tid1 in self.manager)
self.manager.storeObject(uuid1, tid1, serial1, *obj1)
- self.manager.lock(tid1)
+ self.manager.storeObject(uuid1, tid1, serial1, *obj2)
+ self.manager.lock(tid1, txn1[0])
# second transaction is delayed
self.manager.storeTransaction(uuid2, tid2, *txn2)
self.assertTrue(tid2 in self.manager)
- self.assertRaises(DelayedError, self.manager.storeObject,
+ self.assertRaises(DelayedError, self.manager.storeObject,
uuid2, tid2, serial1, *obj1)
- # the second transaction lock another object
+ self.assertRaises(DelayedError, self.manager.storeObject,
+ uuid2, tid2, serial2, *obj2)
+
+ def testLockConflict(self):
+ """ Check lock conflict """
+ uuid1 = self.getNewUUID()
+ uuid2 = self.getNewUUID()
+ self.assertNotEqual(uuid1, uuid2)
+ tid1, txn1 = self._getTransaction()
+ tid2, txn2 = self._getTransaction()
+ serial1, obj1 = self._getObject(1)
+ serial2, obj2 = self._getObject(2)
+ # the second transaction lock objects
self.manager.storeTransaction(uuid2, tid2, *txn2)
+ self.manager.storeObject(uuid2, tid2, serial1, *obj1)
self.manager.storeObject(uuid2, tid2, serial2, *obj2)
self.assertTrue(tid2 in self.manager)
- self.manager.lock(tid2)
+ self.manager.lock(tid2, txn1[0])
# the first get a conflict
self.manager.storeTransaction(uuid1, tid1, *txn1)
self.assertTrue(tid1 in self.manager)
- self.assertRaises(ConflictError, self.manager.storeObject,
+ self.assertRaises(ConflictError, self.manager.storeObject,
+ uuid1, tid1, serial1, *obj1)
+ self.assertRaises(ConflictError, self.manager.storeObject,
uuid1, tid1, serial2, *obj2)
def testAbortUnlocked(self):
@@ -215,15 +238,15 @@
""" Try to abort a locked transaction """
uuid = self.getNewUUID()
tid, txn = self._getTransaction()
- serial, obj = self._getObject(1)
- self.manager.storeTransaction(uuid, tid, *txn)
- self.manager.storeObject(uuid, tid, serial, *obj)
+ self.manager.storeTransaction(uuid, tid, *txn)
+ self._storeTransactionObjects(tid, txn)
# lock transaction
- self.manager.lock(tid)
+ self.manager.lock(tid, txn[0])
self.assertTrue(tid in self.manager)
self.manager.abort(tid, even_if_locked=False)
self.assertTrue(tid in self.manager)
- self.assertTrue(self.manager.loadLocked(obj[0]))
+ for oid in txn[0]:
+ self.assertTrue(self.manager.loadLocked(oid))
self._checkQueuedEventExecuted(number=0)
def testAbortForNode(self):
@@ -238,7 +261,8 @@
# node 2 owns tid2 & tid3 and lock tid2 only
self.manager.storeTransaction(uuid2, tid2, *txn2)
self.manager.storeTransaction(uuid2, tid3, *txn3)
- self.manager.lock(tid2)
+ self._storeTransactionObjects(tid2, txn2)
+ self.manager.lock(tid2, txn2[0])
self.assertTrue(tid1 in self.manager)
self.assertTrue(tid2 in self.manager)
self.assertTrue(tid3 in self.manager)
@@ -253,14 +277,14 @@
""" Reset the manager """
uuid = self.getNewUUID()
tid, txn = self._getTransaction()
- serial, obj = self._getObject(1)
- self.manager.storeTransaction(uuid, tid, *txn)
- self.manager.storeObject(uuid, tid, serial, *obj)
- self.manager.lock(tid)
+ self.manager.storeTransaction(uuid, tid, *txn)
+ self._storeTransactionObjects(tid, txn)
+ self.manager.lock(tid, txn[0])
self.assertTrue(tid in self.manager)
self.manager.reset()
self.assertFalse(tid in self.manager)
- self.assertFalse(self.manager.loadLocked(obj[0]))
+ for oid in txn[0]:
+ self.assertFalse(self.manager.loadLocked(oid))
def test_getObjectFromTransaction(self):
uuid = self.getNewUUID()
Modified: trunk/neo/tests/testProtocol.py
==============================================================================
--- trunk/neo/tests/testProtocol.py [iso-8859-1] (original)
+++ trunk/neo/tests/testProtocol.py [iso-8859-1] Thu May 13 14:04:39 2010
@@ -290,10 +290,14 @@
self.assertEqual(ptid, tid)
def test_38_askLockInformation(self):
- tid = self.getNextTID()
- p = Packets.AskLockInformation(tid)
- ptid = p.decode()[0]
- self.assertEqual(ptid, tid)
+ oid1 = self.getNextTID()
+ oid2 = self.getNextTID()
+ oid_list = [oid1, oid2]
+ tid = self.getNextTID()
+ p = Packets.AskLockInformation(tid, oid_list)
+ ptid, p_oid_list = p.decode()
+ self.assertEqual(ptid, tid)
+ self.assertEqual(oid_list, p_oid_list)
def test_39_answerInformationLocked(self):
tid = self.getNextTID()
More information about the Neo-report
mailing list