[Neo-report] r2596 vincent - in /trunk/neo: ./ client/ client/handlers/ storage/ storage/h...

nobody at svn.erp5.org nobody at svn.erp5.org
Thu Jan 6 15:08:59 CET 2011


Author: vincent
Date: Thu Jan  6 15:08:59 2011
New Revision: 2596

Log:
Resolve deadlocks detected by storage nodes.

Also reverts r2533: even if it's true that TIDs have no meaning at that level,
they are a handy way to prioritise a transaction over another, to break lock
cycles (aka deadlocks). This is the "detection" part of the change.
When a storage reports a deadlock, client must store all already-stored
objects again with "unlock" flag set.
Upon receiving those store requests, storage must release locks held by
transaction on those objects, and requeue the store request. If client didn't
hold any lock (initial store was still in queue), drop the second store
request.
This doesn't solve possible deadlocks if ZODB-level sends us objects in a
different order (ex: client 1 sending [OID1, OID2] & client 2 sending
[OID2, OID1]).
There is one important change to r2533's revert, which queues older
transactions and notifies deadlock for younger ones. The original code did it
the other way around, and it looks unfair to old transactions (they will keep
being delayed by younger ones, and will just get older and older).

Added:
    trunk/neo/storage/exception.py
Modified:
    trunk/neo/client/app.py
    trunk/neo/client/handlers/storage.py
    trunk/neo/handler.py
    trunk/neo/protocol.py
    trunk/neo/storage/app.py
    trunk/neo/storage/handlers/__init__.py
    trunk/neo/storage/handlers/client.py
    trunk/neo/storage/transactions.py
    trunk/neo/tests/client/testClientApp.py
    trunk/neo/tests/functional/testClient.py
    trunk/neo/tests/storage/testClientHandler.py
    trunk/neo/tests/storage/testStorageApp.py
    trunk/neo/tests/storage/testStorageHandler.py
    trunk/neo/tests/storage/testTransactions.py
    trunk/neo/tests/testProtocol.py

Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Thu Jan  6 15:08:59 2011
@@ -699,7 +699,7 @@ class Application(object):
         self._store(oid, serial, data)
         return None
 
-    def _store(self, oid, serial, data, data_serial=None):
+    def _store(self, oid, serial, data, data_serial=None, unlock=False):
         if data is None:
             # This is some undo: either a no-data object (undoing object
             # creation) or a back-pointer to an earlier revision (going back to
@@ -731,7 +731,7 @@ class Application(object):
         queue = self.local_var.queue
         add_involved_nodes = self.local_var.involved_nodes.add
         packet = Packets.AskStoreObject(oid, serial, compression,
-                 checksum, compressed_data, data_serial, self.local_var.tid)
+            checksum, compressed_data, data_serial, self.local_var.tid, unlock)
         for node, conn in self.cp.iterateForObject(oid, writable=True):
             try:
                 conn.ask(packet, on_timeout=on_timeout, queue=queue)
@@ -776,7 +776,37 @@ class Application(object):
             data = data_dict[oid]
             tid = local_var.tid
             resolved = False
-            if data is not None:
+            if conflict_serial == ZERO_TID:
+                # Storage refused us from taking object lock, to avoid a
+                # possible deadlock. TID is actually used for some kind of
+                # "locking priority": when a higher value has the lock,
+                # this means we stored objects "too late", and we would
+                # otherwise cause a deadlock.
+                # To recover, we must ask storages to release locks we
+                # hold (to let possibly-competing transactions acquire
+                # them), and requeue our already-sent store requests.
+                # XXX: currently, brute-force is implemented: we send
+                # object data again.
+                neo.logging.info('Deadlock avoidance triggered on %r:%r',
+                    dump(oid), dump(serial))
+                for store_oid, store_data in \
+                        local_var.data_dict.iteritems():
+                    store_serial = object_serial_dict[store_oid]
+                    if store_data is None:
+                        self.checkCurrentSerialInTransaction(store_oid,
+                            store_serial)
+                    else:
+                        if store_data is '':
+                            # Some undo
+                            neo.logging.warning('Deadlock avoidance cannot'
+                                ' reliably work with undo, this must be '
+                                'implemented.')
+                            break
+                        self._store(store_oid, store_serial, store_data,
+                            unlock=True)
+                else:
+                    resolved = True
+            elif data is not None:
                 new_data = tryToResolveConflict(oid, conflict_serial,
                     serial, data)
                 if new_data is not None:

Modified: trunk/neo/client/handlers/storage.py
==============================================================================
--- trunk/neo/client/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/client/handlers/storage.py [iso-8859-1] Thu Jan  6 15:08:59 2011
@@ -153,3 +153,6 @@ class StorageAnswersHandler(AnswerBaseHa
             # XXX: Not sure what to do in this case yet
             raise NotImplementedError
 
+    def alreadyPendingError(self, conn, message):
+        pass
+

Modified: trunk/neo/handler.py
==============================================================================
--- trunk/neo/handler.py [iso-8859-1] (original)
+++ trunk/neo/handler.py [iso-8859-1] Thu Jan  6 15:08:59 2011
@@ -230,7 +230,7 @@ class EventHandler(object):
         raise UnexpectedPacketError
 
     def askStoreObject(self, conn, oid, serial,
-                             compression, checksum, data, data_serial, tid):
+            compression, checksum, data, data_serial, tid, unlock):
         raise UnexpectedPacketError
 
     def answerStoreObject(self, conn, conflicting, oid, serial):
@@ -411,6 +411,9 @@ class EventHandler(object):
     def brokenNodeDisallowedError(self, conn, message):
         raise RuntimeError, 'broken node disallowed error: %s' % (message,)
 
+    def alreadyPendingError(self, conn, message):
+        neo.logging.error('already pending error: %s' % (message, ))
+
     def ack(self, conn, message):
         neo.logging.debug("no error message : %s" % (message))
 
@@ -516,6 +519,7 @@ class EventHandler(object):
         d[ErrorCodes.TID_NOT_FOUND] = self.tidNotFound
         d[ErrorCodes.PROTOCOL_ERROR] = self.protocolError
         d[ErrorCodes.BROKEN_NODE] = self.brokenNodeDisallowedError
+        d[ErrorCodes.ALREADY_PENDING] = self.alreadyPendingError
 
         return d
 

Modified: trunk/neo/protocol.py
==============================================================================
--- trunk/neo/protocol.py [iso-8859-1] (original)
+++ trunk/neo/protocol.py [iso-8859-1] Thu Jan  6 15:08:59 2011
@@ -45,6 +45,7 @@ class ErrorCodes(Enum):
     TID_NOT_FOUND = Enum.Item(3)
     PROTOCOL_ERROR = Enum.Item(4)
     BROKEN_NODE = Enum.Item(5)
+    ALREADY_PENDING = Enum.Item(7)
 ErrorCodes = ErrorCodes()
 
 class ClusterStates(Enum):
@@ -912,26 +913,28 @@ class AskStoreObject(Packet):
     Ask to store an object. Send an OID, an original serial, a current
     transaction ID, and data. C -> S.
     """
-    _header_format = '!8s8s8sBL8s'
+    _header_format = '!8s8s8sBL8sB'
 
     @profiler_decorator
     def _encode(self, oid, serial, compression, checksum, data, data_serial,
-            tid):
+            tid, unlock):
         if serial is None:
             serial = INVALID_TID
         if data_serial is None:
             data_serial = INVALID_TID
+        unlock = unlock and 1 or 0
         return pack(self._header_format, oid, serial, tid, compression,
-                          checksum, data_serial) + _encodeString(data)
+                          checksum, data_serial, unlock) + _encodeString(data)
 
     def _decode(self, body):
         header_len = self._header_len
         r = unpack(self._header_format, body[:header_len])
-        oid, serial, tid, compression, checksum, data_serial = r
+        oid, serial, tid, compression, checksum, data_serial, unlock = r
         serial = _decodeTID(serial)
         data_serial = _decodeTID(data_serial)
         (data, _) = _decodeString(body, 'data', offset=header_len)
-        return (oid, serial, compression, checksum, data, data_serial, tid)
+        return (oid, serial, compression, checksum, data, data_serial, tid,
+            bool(unlock))
 
 class AnswerStoreObject(Packet):
     """
@@ -2065,6 +2068,7 @@ class ErrorRegistry(dict):
     OidDoesNotExist = register_error(ErrorCodes.OID_DOES_NOT_EXIST)
     NotReady = register_error(ErrorCodes.NOT_READY)
     Broken = register_error(ErrorCodes.BROKEN_NODE)
+    AlreadyPending = register_error(ErrorCodes.ALREADY_PENDING)
 
 Errors = ErrorRegistry()
 

Modified: trunk/neo/storage/app.py
==============================================================================
--- trunk/neo/storage/app.py [iso-8859-1] (original)
+++ trunk/neo/storage/app.py [iso-8859-1] Thu Jan  6 15:08:59 2011
@@ -29,6 +29,7 @@ from neo.storage.handlers import master,
 from neo.storage.replicator import Replicator
 from neo.storage.database import buildDatabaseManager
 from neo.storage.transactions import TransactionManager
+from neo.storage.exception import AlreadyPendingError
 from neo.connector import getConnectorHandler
 from neo.pt import PartitionTable
 from neo.util import dump
@@ -71,6 +72,7 @@ class Application(object):
 
         # operation related data
         self.event_queue = None
+        self.event_queue_keys = None
         self.operational = False
 
         # ready is True when operational and got all informations
@@ -193,6 +195,7 @@ class Application(object):
                     conn.close()
             # create/clear event queue
             self.event_queue = deque()
+            self.event_queue_keys = set()
             try:
                 self.verifyData()
                 self.initialize()
@@ -318,15 +321,25 @@ class Application(object):
             if not node.isHidden():
                 break
 
-    def queueEvent(self, some_callable, conn, *args):
+    def queueEvent(self, some_callable, conn, args, key=None,
+            raise_on_duplicate=True):
         msg_id = conn.getPeerId()
-        self.event_queue.append((some_callable, msg_id, conn, args))
+        keys = self.event_queue_keys
+        if raise_on_duplicate and key in keys:
+            raise AlreadyPendingError()
+        else:
+            self.event_queue.append((key, some_callable, msg_id, conn, args))
+            if key is not None:
+                keys.add(key)
 
     def executeQueuedEvents(self):
         l = len(self.event_queue)
         p = self.event_queue.popleft
+        remove = self.event_queue_keys.remove
         for _ in xrange(l):
-            some_callable, msg_id, conn, args = p()
+            key, some_callable, msg_id, conn, args = p()
+            if key is not None:
+                remove(key)
             if conn.isAborted() or conn.isClosed():
                 continue
             orig_msg_id = conn.getPeerId()

Added: trunk/neo/storage/exception.py
==============================================================================
--- trunk/neo/storage/exception.py (added)
+++ trunk/neo/storage/exception.py [iso-8859-1] Thu Jan  6 15:08:59 2011
@@ -0,0 +1,20 @@
+#
+# Copyright (C) 2010  Nexedi SA
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+
+class AlreadyPendingError(Exception):
+    pass
+

Modified: trunk/neo/storage/handlers/__init__.py
==============================================================================
--- trunk/neo/storage/handlers/__init__.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/__init__.py [iso-8859-1] Thu Jan  6 15:08:59 2011
@@ -82,7 +82,7 @@ class BaseClientAndStorageOperationHandl
         app = self.app
         if self.app.tm.loadLocked(oid):
             # Delay the response.
-            app.queueEvent(self.askObject, conn, oid, serial, tid)
+            app.queueEvent(self.askObject, conn, (oid, serial, tid))
             return
         o = self._askObject(oid, serial, tid)
         if o is None:

Modified: trunk/neo/storage/handlers/client.py
==============================================================================
--- trunk/neo/storage/handlers/client.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/client.py [iso-8859-1] Thu Jan  6 15:08:59 2011
@@ -21,6 +21,7 @@ from neo.util import dump
 from neo.protocol import Packets, LockState, Errors
 from neo.storage.handlers import BaseClientAndStorageOperationHandler
 from neo.storage.transactions import ConflictError, DelayedError
+from neo.storage.exception import AlreadyPendingError
 import time
 
 # Log stores taking (incl. lock delays) more than this many seconds.
@@ -47,7 +48,7 @@ class ClientOperationHandler(BaseClientA
         conn.answer(Packets.AnswerStoreTransaction(tid))
 
     def _askStoreObject(self, conn, oid, serial, compression, checksum, data,
-            data_serial, tid, request_time):
+            data_serial, tid, unlock, request_time):
         if tid not in self.app.tm:
             # transaction was aborted, cancel this event
             neo.logging.info('Forget store of %s:%s by %s delayed by %s',
@@ -58,15 +59,23 @@ class ClientOperationHandler(BaseClientA
             return
         try:
             self.app.tm.storeObject(tid, serial, oid, compression,
-                    checksum, data, data_serial)
+                    checksum, data, data_serial, unlock)
         except ConflictError, err:
             # resolvable or not
             tid_or_serial = err.getTID()
             conn.answer(Packets.AnswerStoreObject(1, oid, tid_or_serial))
         except DelayedError:
             # locked by a previous transaction, retry later
-            self.app.queueEvent(self._askStoreObject, conn, oid, serial,
-                compression, checksum, data, data_serial, tid, request_time)
+            # If we are unlocking, we want queueEvent to raise
+            # AlreadyPendingError, to avoid making lcient wait for an unneeded
+            # response.
+            try:
+                self.app.queueEvent(self._askStoreObject, conn, (oid, serial,
+                    compression, checksum, data, data_serial, tid,
+                    unlock, request_time), key=(oid, tid),
+                    raise_on_duplicate=unlock)
+            except AlreadyPendingError:
+                conn.answer(Errors.AlreadyPending(dump(oid)))
         else:
             if SLOW_STORE is not None:
                 duration = time.time() - request_time
@@ -75,7 +84,7 @@ class ClientOperationHandler(BaseClientA
             conn.answer(Packets.AnswerStoreObject(0, oid, serial))
 
     def askStoreObject(self, conn, oid, serial,
-                             compression, checksum, data, data_serial, tid):
+            compression, checksum, data, data_serial, tid, unlock):
         # register the transaction
         self.app.tm.register(conn.getUUID(), tid)
         if data_serial is not None:
@@ -84,7 +93,7 @@ class ClientOperationHandler(BaseClientA
             # delayed.
             data = None
         self._askStoreObject(conn, oid, serial, compression, checksum, data,
-            data_serial, tid, time.time())
+            data_serial, tid, unlock, time.time())
 
     def askTIDsFrom(self, conn, min_tid, max_tid, length, partition_list):
         app = self.app
@@ -172,8 +181,11 @@ class ClientOperationHandler(BaseClientA
                 err.getTID()))
         except DelayedError:
             # locked by a previous transaction, retry later
-            self.app.queueEvent(self._askCheckCurrentSerial, conn, tid, serial,
-                oid, request_time)
+            try:
+                self.app.queueEvent(self._askCheckCurrentSerial, conn, (tid,
+                    serial, oid, request_time), key=(oid, tid))
+            except AlreadyPendingError:
+                conn.answer(Errors.AlreadyPending(dump(oid)))
         else:
             if SLOW_STORE is not None:
                 duration = time.time() - request_time

Modified: trunk/neo/storage/transactions.py
==============================================================================
--- trunk/neo/storage/transactions.py [iso-8859-1] (original)
+++ trunk/neo/storage/transactions.py [iso-8859-1] Thu Jan  6 15:08:59 2011
@@ -18,7 +18,7 @@
 from time import time
 import neo
 from neo.util import dump
-
+from neo.protocol import ZERO_TID
 
 class ConflictError(Exception):
     """
@@ -214,7 +214,7 @@ class TransactionManager(object):
     def getLockingTID(self, oid):
         return self._store_lock_dict.get(oid)
 
-    def lockObject(self, tid, serial, oid):
+    def lockObject(self, tid, serial, oid, unlock=False):
         """
             Take a write lock on given object, checking that "serial" is
             current.
@@ -224,6 +224,16 @@ class TransactionManager(object):
         """
         # check if the object if locked
         locking_tid = self._store_lock_dict.get(oid)
+        if locking_tid == tid and unlock:
+            neo.logging.info('Deadlock resolution on %r:%r', dump(oid),
+                dump(tid))
+            # A duplicate store means client is resolving a deadlock, so
+            # drop the lock it held on this object.
+            del self._store_lock_dict[oid]
+            # Give a chance to pending events to take that lock now.
+            self._app.executeQueuedEvents()
+            # Attemp to acquire lock again.
+            locking_tid = self._store_lock_dict.get(oid)
         if locking_tid == tid:
             neo.logging.info('Transaction %s storing %s more than once',
                 dump(tid), dump(oid))
@@ -236,24 +246,34 @@ class TransactionManager(object):
                 raise ConflictError(history_list[0][0])
             neo.logging.info('Transaction %s storing %s', dump(tid), dump(oid))
             self._store_lock_dict[oid] = tid
-        else:
-            # a previous transaction lock this object, retry later
+        elif locking_tid > tid:
+            # We have a smaller TID than locking transaction, so we are older:
+            # enter waiting queue so we are handled when lock gets released.
             neo.logging.info('Store delayed for %r:%r by %r', dump(oid),
                     dump(tid), dump(locking_tid))
             raise DelayedError
+        else:
+            # We have a bigger TID than locking transaction, so we are
+            # younger: this is a possible deadlock case, as we might already
+            # hold locks that older transaction is waiting upon. Make client
+            # release locks & reacquire them by notifying it of the possible
+            # deadlock.
+            neo.logging.info('Possible deadlock on %r:%r with %r',
+                dump(oid), dump(tid), dump(locking_tid))
+            raise ConflictError(ZERO_TID)
 
     def checkCurrentSerial(self, tid, serial, oid):
-        self.lockObject(tid, serial, oid)
+        self.lockObject(tid, serial, oid, unlock=True)
         assert tid in self, "Transaction not registered"
         transaction = self._transaction_dict[tid]
         transaction.addCheckedObject(oid)
 
     def storeObject(self, tid, serial, oid, compression, checksum, data,
-            value_serial):
+            value_serial, unlock=False):
         """
             Store an object received from client node
         """
-        self.lockObject(tid, serial, oid)
+        self.lockObject(tid, serial, oid, unlock=unlock)
         # store object
         assert tid in self, "Transaction not registered"
         transaction = self._transaction_dict[tid]

Modified: trunk/neo/tests/client/testClientApp.py
==============================================================================
--- trunk/neo/tests/client/testClientApp.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testClientApp.py [iso-8859-1] Thu Jan  6 15:08:59 2011
@@ -616,11 +616,11 @@ class ClientApplicationTests(NeoUnitTest
         """ check that abort is sent to all nodes involved in the transaction """
         app = self.getApp()
         # three partitions/storages: one per object/transaction
-        app.num_partitions = 3
+        app.num_partitions = num_partitions = 3
         app.num_replicas = 0
-        tid = self.makeTID(0)  # on partition 0
-        oid1 = self.makeOID(1) # on partition 1, conflicting
-        oid2 = self.makeOID(2) # on partition 2
+        tid = self.makeTID(num_partitions)  # on partition 0
+        oid1 = self.makeOID(num_partitions + 1) # on partition 1, conflicting
+        oid2 = self.makeOID(num_partitions + 2) # on partition 2
         # storage nodes
         uuid1, uuid2, uuid3 = [self.getNewUUID() for _ in range(3)]
         address1 = ('127.0.0.1', 10000)

Modified: trunk/neo/tests/functional/testClient.py
==============================================================================
--- trunk/neo/tests/functional/testClient.py [iso-8859-1] (original)
+++ trunk/neo/tests/functional/testClient.py [iso-8859-1] Thu Jan  6 15:08:59 2011
@@ -254,8 +254,8 @@ class ClientTests(NEOFunctionalTest):
             oid = st1.new_oid()
             rev = '\0' * 8
             data = zodb_pickle(PObject())
-            st1.tpc_begin(t1)
             st2.tpc_begin(t2)
+            st1.tpc_begin(t1)
             st1.store(oid, rev, data, '', t1)
             # this store will be delayed
             st2.store(oid, rev, data, '', t2)

Modified: trunk/neo/tests/storage/testClientHandler.py
==============================================================================
--- trunk/neo/tests/storage/testClientHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testClientHandler.py [iso-8859-1] Thu Jan  6 15:08:59 2011
@@ -45,6 +45,7 @@ class StorageClientHandlerTests(NeoUnitT
         self.app.store_lock_dict = {}
         self.app.load_lock_dict = {}
         self.app.event_queue = deque()
+        self.app.event_queue_keys = set()
         self.app.tm = Mock({'__contains__': True})
         # handler
         self.operation = ClientOperationHandler(self.app)
@@ -215,9 +216,9 @@ class StorageClientHandlerTests(NeoUnitT
         tid = self.getNextTID()
         oid, serial, comp, checksum, data = self._getObject()
         self.operation.askStoreObject(conn, oid, serial, comp, checksum, 
-                data, None, tid)
+                data, None, tid, False)
         self._checkStoreObjectCalled(tid, serial, oid, comp,
-                checksum, data, None)
+                checksum, data, None, False)
         pconflicting, poid, pserial = self.checkAnswerStoreObject(conn,
             decode=True)
         self.assertEqual(pconflicting, 0)
@@ -232,9 +233,9 @@ class StorageClientHandlerTests(NeoUnitT
         oid, serial, comp, checksum, data = self._getObject()
         data_tid = self.getNextTID()
         self.operation.askStoreObject(conn, oid, serial, comp, checksum, 
-                '', data_tid, tid)
+                '', data_tid, tid, False)
         self._checkStoreObjectCalled(tid, serial, oid, comp,
-                checksum, None, data_tid)
+                checksum, None, data_tid, False)
         pconflicting, poid, pserial = self.checkAnswerStoreObject(conn,
             decode=True)
         self.assertEqual(pconflicting, 0)
@@ -252,7 +253,7 @@ class StorageClientHandlerTests(NeoUnitT
         self.app.tm.storeObject = fakeStoreObject
         oid, serial, comp, checksum, data = self._getObject()
         self.operation.askStoreObject(conn, oid, serial, comp, checksum, 
-                data, None, tid)
+                data, None, tid, False)
         pconflicting, poid, pserial = self.checkAnswerStoreObject(conn,
             decode=True)
         self.assertEqual(pconflicting, 1)

Modified: trunk/neo/tests/storage/testStorageApp.py
==============================================================================
--- trunk/neo/tests/storage/testStorageApp.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testStorageApp.py [iso-8859-1] Thu Jan  6 15:08:59 2011
@@ -23,6 +23,7 @@ from neo.protocol import CellStates
 from collections import deque
 from neo.pt import PartitionTable
 from neo.util import dump
+from neo.storage.exception import AlreadyPendingError
 
 class StorageAppTests(NeoUnitTestBase):
 
@@ -33,6 +34,7 @@ class StorageAppTests(NeoUnitTestBase):
         config = self.getStorageConfiguration(master_number=1)
         self.app = Application(config)
         self.app.event_queue = deque()
+        self.app.event_queue_keys = set()
 
     def test_01_loadPartitionTable(self):
         self.app.dm = Mock({
@@ -121,12 +123,20 @@ class StorageAppTests(NeoUnitTestBase):
         msg_id = 1325136
         event = Mock({'__repr__': 'event'})
         conn = Mock({'__repr__': 'conn', 'getPeerId': msg_id})
-        self.app.queueEvent(event, conn, "test")
+        key = 'foo'
+        self.app.queueEvent(event, conn, ("test", ), key=key)
         self.assertEqual(len(self.app.event_queue), 1)
-        _event, _msg_id, _conn, args = self.app.event_queue[0]
+        _key, _event, _msg_id, _conn, args = self.app.event_queue[0]
+        self.assertEqual(key, _key)
         self.assertEqual(msg_id, _msg_id)
         self.assertEqual(len(args), 1)
         self.assertEqual(args[0], "test")
+        self.assertRaises(AlreadyPendingError, self.app.queueEvent, event,
+            conn, ("test2", ), key=key)
+        self.assertEqual(len(self.app.event_queue), 1)
+        self.app.queueEvent(event, conn, ("test3", ), key=key,
+            raise_on_duplicate=False)
+        self.assertEqual(len(self.app.event_queue), 2)
 
     def test_03_executeQueuedEvents(self):
         self.assertEqual(len(self.app.event_queue), 0)
@@ -134,7 +144,7 @@ class StorageAppTests(NeoUnitTestBase):
         msg_id_2 = 1325137
         event = Mock({'__repr__': 'event'})
         conn = Mock({'__repr__': 'conn', 'getPeerId': ReturnValues(msg_id, msg_id_2)})
-        self.app.queueEvent(event, conn, "test")
+        self.app.queueEvent(event, conn, ("test", ))
         self.app.executeQueuedEvents()
         self.assertEquals(len(event.mockGetNamedCalls("__call__")), 1)
         call = event.mockGetNamedCalls("__call__")[0]

Modified: trunk/neo/tests/storage/testStorageHandler.py
==============================================================================
--- trunk/neo/tests/storage/testStorageHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testStorageHandler.py [iso-8859-1] Thu Jan  6 15:08:59 2011
@@ -43,6 +43,7 @@ class StorageStorageHandlerTests(NeoUnit
         self.app.store_lock_dict = {}
         self.app.load_lock_dict = {}
         self.app.event_queue = deque()
+        self.app.event_queue_keys = set()
         # handler
         self.operation = StorageOperationHandler(self.app)
         # set pmn

Modified: trunk/neo/tests/storage/testTransactions.py
==============================================================================
--- trunk/neo/tests/storage/testTransactions.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testTransactions.py [iso-8859-1] Thu Jan  6 15:08:59 2011
@@ -144,8 +144,8 @@ class TransactionManagerTests(NeoUnitTes
     def testDelayed(self):
         """ Two transactions, the first cause the second to be delayed """
         uuid = self.getNewUUID()
-        ttid1 = self.getNextTID()
         ttid2 = self.getNextTID()
+        ttid1 = self.getNextTID()
         tid1, txn1 = self._getTransaction()
         tid2, txn2 = self._getTransaction()
         serial, obj = self._getObject(1)
@@ -162,6 +162,27 @@ class TransactionManagerTests(NeoUnitTes
         self.assertRaises(DelayedError, self.manager.storeObject,
                 ttid2, serial, *obj)
 
+    def testUnresolvableConflict(self):
+        """ A newer transaction has already modified an object """
+        uuid = self.getNewUUID()
+        ttid2 = self.getNextTID()
+        ttid1 = self.getNextTID()
+        tid1, txn1 = self._getTransaction()
+        tid2, txn2 = self._getTransaction()
+        serial, obj = self._getObject(1)
+        # the (later) transaction lock (change) the object
+        self.manager.register(uuid, ttid2)
+        self.manager.storeTransaction(ttid2, *txn2)
+        self.assertTrue(ttid2 in self.manager)
+        self._storeTransactionObjects(ttid2, txn2)
+        self.manager.lock(ttid2, tid2, txn2[0])
+        # the previous it's not using the latest version
+        self.manager.register(uuid, ttid1)
+        self.manager.storeTransaction(ttid1, *txn1)
+        self.assertTrue(ttid1 in self.manager)
+        self.assertRaises(ConflictError, self.manager.storeObject,
+                ttid1, serial, *obj)
+
     def testResolvableConflict(self):
         """ Try to store an object with the lastest revision """
         uuid = self.getNewUUID()
@@ -180,8 +201,8 @@ class TransactionManagerTests(NeoUnitTes
         uuid1 = self.getNewUUID()
         uuid2 = self.getNewUUID()
         self.assertNotEqual(uuid1, uuid2)
-        ttid1 = self.getNextTID()
         ttid2 = self.getNextTID()
+        ttid1 = self.getNextTID()
         tid1, txn1 = self._getTransaction()
         tid2, txn2 = self._getTransaction()
         serial1, obj1 = self._getObject(1)
@@ -207,8 +228,8 @@ class TransactionManagerTests(NeoUnitTes
         uuid1 = self.getNewUUID()
         uuid2 = self.getNewUUID()
         self.assertNotEqual(uuid1, uuid2)
-        ttid1 = self.getNextTID()
         ttid2 = self.getNextTID()
+        ttid1 = self.getNextTID()
         tid1, txn1 = self._getTransaction()
         tid2, txn2 = self._getTransaction()
         serial1, obj1 = self._getObject(1)
@@ -220,13 +241,13 @@ class TransactionManagerTests(NeoUnitTes
         self.manager.storeObject(ttid2, serial2, *obj2)
         self.assertTrue(ttid2 in self.manager)
         self.manager.lock(ttid2, tid2, txn1[0])
-        # the first get a delay, as nothing is committed yet
+        # the first get a conflict
         self.manager.register(uuid1, ttid1)
         self.manager.storeTransaction(ttid1, *txn1)
         self.assertTrue(ttid1 in self.manager)
-        self.assertRaises(DelayedError, self.manager.storeObject,
+        self.assertRaises(ConflictError, self.manager.storeObject,
                 ttid1, serial1, *obj1)
-        self.assertRaises(DelayedError, self.manager.storeObject,
+        self.assertRaises(ConflictError, self.manager.storeObject,
                 ttid1, serial2, *obj2)
 
     def testAbortUnlocked(self):

Modified: trunk/neo/tests/testProtocol.py
==============================================================================
--- trunk/neo/tests/testProtocol.py [iso-8859-1] (original)
+++ trunk/neo/tests/testProtocol.py [iso-8859-1] Thu Jan  6 15:08:59 2011
@@ -352,8 +352,10 @@ class ProtocolTests(NeoUnitTestBase):
         serial = self.getNextTID()
         tid = self.getNextTID()
         tid2 = self.getNextTID()
-        p = Packets.AskStoreObject(oid, serial, 1, 55, "to", tid2, tid)
-        poid, pserial, compression, checksum, data, ptid2, ptid = p.decode()
+        unlock = False
+        p = Packets.AskStoreObject(oid, serial, 1, 55, "to", tid2, tid, unlock)
+        poid, pserial, compression, checksum, data, ptid2, ptid, punlock = \
+            p.decode()
         self.assertEqual(oid, poid)
         self.assertEqual(serial, pserial)
         self.assertEqual(tid, ptid)
@@ -361,6 +363,7 @@ class ProtocolTests(NeoUnitTestBase):
         self.assertEqual(compression, 1)
         self.assertEqual(checksum, 55)
         self.assertEqual(data, "to")
+        self.assertEqual(unlock, punlock)
 
     def test_46_answerStoreObject(self):
         oid = self.getNextTID()




More information about the Neo-report mailing list