[Neo-report] r2532 vincent - in /trunk/neo: ./ client/ storage/database/ storage/handlers/...

nobody at svn.erp5.org nobody at svn.erp5.org
Tue Dec 14 16:56:54 CET 2010


Author: vincent
Date: Tue Dec 14 16:56:53 2010
New Revision: 2532

Log:
Implement MVCC.

Remove round-trip to master upon "load" call.
Move load/loadBefore/loadSerial/loadEx from app.py to Storage.py.
This is required to get rid of master node round-trip upon each "load"
call.
Get rid of no-op-ish "sync" implementation.
Separate "undoing transaction ID" from "undoing transaction database
snapshot" when undoing.

Modified:
    trunk/neo/client/Storage.py
    trunk/neo/client/app.py
    trunk/neo/client/iterator.py
    trunk/neo/protocol.py
    trunk/neo/storage/database/manager.py
    trunk/neo/storage/handlers/client.py
    trunk/neo/tests/client/testClientApp.py
    trunk/neo/tests/functional/testClient.py
    trunk/neo/tests/storage/testClientHandler.py
    trunk/neo/tests/storage/testStorageDBTests.py
    trunk/neo/tests/testProtocol.py

Modified: trunk/neo/client/Storage.py
==============================================================================
--- trunk/neo/client/Storage.py [iso-8859-1] (original)
+++ trunk/neo/client/Storage.py [iso-8859-1] Tue Dec 14 16:56:53 2010
@@ -20,6 +20,8 @@ from zope.interface import implements
 import ZODB.interfaces
 
 from neo import setupLog
+from neo.util import add64
+from neo.protocol import ZERO_TID
 from neo.client.app import Application
 from neo.client.exception import NEOStorageNotFoundError
 from neo.client.exception import NEOStorageDoesNotExistError
@@ -42,6 +44,8 @@ class Storage(BaseStorage.BaseStorage,
               ConflictResolution.ConflictResolvingStorage):
     """Wrapper class for neoclient."""
 
+    _snapshot_tid = None
+
     implements(
         ZODB.interfaces.IStorage,
         # "restore" missing for the moment, but "store" implements this
@@ -54,19 +58,67 @@ class Storage(BaseStorage.BaseStorage,
         ZODB.interfaces.IStorageUndoable,
         ZODB.interfaces.IExternalGC,
         ZODB.interfaces.ReadVerifyingStorage,
+        ZODB.interfaces.IMVCCStorage,
     )
 
     def __init__(self, master_nodes, name, connector=None, read_only=False,
-                 compress=None, logfile=None, verbose=False, **kw):
+            compress=None, logfile=None, verbose=False,
+            _app=None, _cache=None,
+            **kw):
+        """
+        Do not pass those parameters (used internally):
+        _app
+        _cache
+        """
         if compress is None:
             compress = True
         setupLog('CLIENT', filename=logfile, verbose=verbose)
         BaseStorage.BaseStorage.__init__(self, 'NEOStorage(%s)' % (name, ))
         # Warning: _is_read_only is used in BaseStorage, do not rename it.
         self._is_read_only = read_only
-        self.app = Application(master_nodes, name, connector,
-            compress=compress)
-        self._cache = DummyCache(self.app)
+        if _app is None:
+            _app = Application(master_nodes, name, connector,
+                compress=compress)
+            assert _cache is None
+            _cache = DummyCache(_app)
+        self.app = _app
+        assert _cache is not None
+        self._cache = _cache
+        # Used to clone self (see new_instance & IMVCCStorage definition).
+        self._init_args = (master_nodes, name)
+        self._init_kw = {
+            'connector': connector,
+            'read_only': read_only,
+            'compress': compress,
+            'logfile': logfile,
+            'verbose': verbose,
+            '_app': _app,
+            '_cache': _cache,
+        }
+
+    def _getSnapshotTID(self):
+        """
+        Get the highest TID visible for current transaction.
+        First call sets this snapshot by asking master node most recent
+        committed TID.
+        As a (positive) side-effect, this forces us to handle all pending
+        invalidations, so we get a very recent view of the database (which is
+        good when multiple databases are used in the same program with some
+        amount of referential integrity).
+        """
+        tid = self._snapshot_tid
+        if tid is None:
+            tid = self.lastTransaction()
+            if tid is ZERO_TID:
+                raise NEOStorageDoesNotExistError('No transaction in storage')
+            # Increment by one, as we will use this as an excluded upper
+            # bound (loadBefore).
+            tid = add64(tid, 1)
+            self._snapshot_tid = tid
+        return tid
+
+    def _load(self, *args, **kw):
+        return self.app.load(self._getSnapshotTID(), *args, **kw)
 
     def load(self, oid, version=''):
         # XXX: interface deifinition states that version parameter is
@@ -74,7 +126,7 @@ class Storage(BaseStorage.BaseStorage,
         # it optional.
         assert version == '', 'Versions are not supported'
         try:
-            return self.app.load(oid=oid)
+            return self._load(oid)[:2]
         except NEOStorageNotFoundError:
             raise POSException.POSKeyError(oid)
 
@@ -97,11 +149,14 @@ class Storage(BaseStorage.BaseStorage,
 
     @check_read_only
     def tpc_abort(self, transaction):
+        self.sync()
         return self.app.tpc_abort(transaction=transaction)
 
     def tpc_finish(self, transaction, f=None):
-        return self.app.tpc_finish(transaction=transaction,
+        result = self.app.tpc_finish(transaction=transaction,
             tryToResolveConflict=self.tryToResolveConflict, f=f)
+        self.sync()
+        return result
 
     @check_read_only
     def store(self, oid, serial, data, version, transaction):
@@ -117,13 +172,13 @@ class Storage(BaseStorage.BaseStorage,
     # mutliple revisions
     def loadSerial(self, oid, serial):
         try:
-            return self.app.loadSerial(oid=oid, serial=serial)
+            return self._load(oid, serial=serial)[0]
         except NEOStorageNotFoundError:
             raise POSException.POSKeyError(oid)
 
     def loadBefore(self, oid, tid):
         try:
-            return self.app.loadBefore(oid=oid, tid=tid)
+            return self._load(oid, tid=tid)
         except NEOStorageDoesNotExistError:
             raise POSException.POSKeyError(oid)
         except NEOStorageNotFoundError:
@@ -135,8 +190,8 @@ class Storage(BaseStorage.BaseStorage,
     # undo
     @check_read_only
     def undo(self, transaction_id, txn):
-        return self.app.undo(undone_tid=transaction_id, txn=txn,
-            tryToResolveConflict=self.tryToResolveConflict)
+        return self.app.undo(self._getSnapshotTID(), undone_tid=transaction_id,
+            txn=txn, tryToResolveConflict=self.tryToResolveConflict)
 
 
     @check_read_only
@@ -159,9 +214,10 @@ class Storage(BaseStorage.BaseStorage,
 
     def loadEx(self, oid, version):
         try:
-            return self.app.loadEx(oid=oid, version=version)
+            data, serial, _ = self._load(oid)
         except NEOStorageNotFoundError:
             raise POSException.POSKeyError(oid)
+        return data, serial, ''
 
     def __len__(self):
         return self.app.getStorageSize()
@@ -172,8 +228,8 @@ class Storage(BaseStorage.BaseStorage,
     def history(self, oid, version=None, size=1, filter=None):
         return self.app.history(oid, version, size, filter)
 
-    def sync(self):
-        self.app.sync()
+    def sync(self, force=True):
+        self._snapshot_tid = None
 
     def copyTransactionsFrom(self, source, verbose=False):
         """ Zope compliant API """
@@ -217,9 +273,23 @@ class Storage(BaseStorage.BaseStorage,
     def close(self):
         self.app.close()
 
-    def getTID(self, oid):
-        return self.app.getLastTID(oid)
+    def getTid(self, oid):
+        try:
+            return self.app.getLastTID(oid)
+        except NEOStorageNotFoundError:
+            raise KeyError
 
     def checkCurrentSerialInTransaction(self, oid, serial, transaction):
         self.app.checkCurrentSerialInTransaction(oid, serial, transaction)
 
+    def new_instance(self):
+        return Storage(*self._init_args, **self._init_kw)
+
+    def poll_invalidations(self):
+        """
+        Nothing to do, NEO doesn't need any polling.
+        """
+        pass
+
+    release = sync
+

Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Tue Dec 14 16:56:53 2010
@@ -115,7 +115,6 @@ class ThreadContext(object):
             'asked_object': 0,
             'undo_object_tid_dict': {},
             'involved_nodes': set(),
-            'barrier_done': False,
             'last_transaction': None,
         }
 
@@ -564,10 +563,13 @@ class Application(object):
         return int(u64(self.last_oid))
 
     @profiler_decorator
-    def _load(self, oid, serial=None, tid=None):
+    def load(self, snapshot_tid, oid, serial=None, tid=None):
         """
         Internal method which manage load, loadSerial and loadBefore.
         OID and TID (serial) parameters are expected packed.
+        snapshot_tid
+            First TID not visible to current transaction.
+            Set to None for no limit.
         oid
             OID of object to get.
         serial
@@ -595,15 +597,19 @@ class Application(object):
         """
         # TODO:
         # - rename parameters (here and in handlers & packet definitions)
+        if snapshot_tid is not None:
+            if serial is None:
+                if tid is None:
+                    tid = snapshot_tid
+                else:
+                    tid = min(tid, snapshot_tid)
+            # XXX: we must not clamp serial with snapshot_tid, as loadSerial is
+            # used during conflict resolution to load object's current version,
+            # which is not visible to us normaly (it was committed after our
+            # snapshot was taken).
 
         self._load_lock_acquire()
         try:
-            # Once per transaction, upon first load, trigger a barrier so we
-            # handle all pending invalidations, so the snapshot of the database
-            # is as up-to-date as possible.
-            if not self.local_var.barrier_done:
-                self.invalidationBarrier()
-                self.local_var.barrier_done = True
             try:
                 result = self._loadFromCache(oid, serial, tid)
             except KeyError:
@@ -702,34 +708,6 @@ class Application(object):
             self._cache_lock_release()
 
     @profiler_decorator
-    def load(self, oid, version=None):
-        """Load an object for a given oid."""
-        result = self._load(oid)[:2]
-        # Start a network barrier, so we get all invalidations *after* we
-        # received data. This ensures we get any invalidation message that
-        # would have been about the version we loaded.
-        # Those invalidations are checked at ZODB level, so it decides if
-        # loaded data can be handed to current transaction or if a separate
-        # loadBefore call is required.
-        # XXX: A better implementation is required to improve performances
-        self.invalidationBarrier()
-        return result
-
-    @profiler_decorator
-    def loadSerial(self, oid, serial):
-        """Load an object for a given oid and serial."""
-        neo.logging.debug('loading %s at %s', dump(oid), dump(serial))
-        return self._load(oid, serial=serial)[0]
-
-
-    @profiler_decorator
-    def loadBefore(self, oid, tid):
-        """Load an object for a given oid before tid committed."""
-        neo.logging.debug('loading %s before %s', dump(oid), dump(tid))
-        return self._load(oid, tid=tid)
-
-
-    @profiler_decorator
     def tpc_begin(self, transaction, tid=None, status=' '):
         """Begin a new transaction."""
         # First get a transaction, only one is allowed at a time
@@ -1047,7 +1025,7 @@ class Application(object):
         finally:
             self._load_lock_release()
 
-    def undo(self, undone_tid, txn, tryToResolveConflict):
+    def undo(self, snapshot_tid, undone_tid, txn, tryToResolveConflict):
         if txn is not self.local_var.txn:
             raise StorageTransactionError(self, undone_tid)
 
@@ -1106,7 +1084,7 @@ class Application(object):
             cell_list.sort(key=getCellSortKey)
             storage_conn = getConnForCell(cell_list[0])
             storage_conn.ask(Packets.AskObjectUndoSerial(self.local_var.tid,
-                undone_tid, oid_list), queue=queue)
+                snapshot_tid, undone_tid, oid_list), queue=queue)
 
         # Wait for all AnswerObjectUndoSerial. We might get OidNotFoundError,
         # meaning that objects in transaction's oid_list do not exist any
@@ -1133,9 +1111,9 @@ class Application(object):
                 # object. This is an undo conflict, try to resolve it.
                 try:
                     # Load the latest version we are supposed to see
-                    data = self.loadSerial(oid, current_serial)
+                    data = self.load(snapshot_tid, oid, serial=current_serial)[0]
                     # Load the version we were undoing to
-                    undo_data = self.loadSerial(oid, undo_serial)
+                    undo_data = self.load(snapshot_tid, oid, serial=undo_serial)[0]
                 except NEOStorageNotFoundError:
                     raise UndoError('Object not found while resolving undo '
                         'conflict')
@@ -1346,10 +1324,6 @@ class Application(object):
             raise StorageTransactionError(self, transaction)
         return '', []
 
-    def loadEx(self, oid, version):
-        data, serial = self.load(oid=oid)
-        return data, serial, ''
-
     def __del__(self):
         """Clear all connection."""
         # Due to bug in ZODB, close is not always called when shutting
@@ -1367,9 +1341,6 @@ class Application(object):
     def invalidationBarrier(self):
         self._askPrimary(Packets.AskBarrier())
 
-    def sync(self):
-        self._waitAnyMessage(False)
-
     def setNodeReady(self):
         self.local_var.node_ready = True
 
@@ -1401,7 +1372,7 @@ class Application(object):
             self._cache_lock_release()
 
     def getLastTID(self, oid):
-        return self._load(oid)[1]
+        return self.load(None, oid)[1]
 
     def checkCurrentSerialInTransaction(self, oid, serial, transaction):
         local_var = self.local_var

Modified: trunk/neo/client/iterator.py
==============================================================================
--- trunk/neo/client/iterator.py [iso-8859-1] (original)
+++ trunk/neo/client/iterator.py [iso-8859-1] Tue Dec 14 16:56:53 2010
@@ -61,7 +61,7 @@ class Transaction(BaseStorage.Transactio
         while oid_index < oid_len:
             oid = oid_list[oid_index]
             try:
-                data, _, next_tid = app._load(oid, serial=self.tid)
+                data, _, next_tid = app.load(None, oid, serial=self.tid)
             except NEOStorageCreationUndoneError:
                 data = next_tid = None
             except NEOStorageNotFoundError:

Modified: trunk/neo/protocol.py
==============================================================================
--- trunk/neo/protocol.py [iso-8859-1] (original)
+++ trunk/neo/protocol.py [iso-8859-1] Tue Dec 14 16:56:53 2010
@@ -1507,12 +1507,12 @@ class AskObjectUndoSerial(Packet):
     for a list of OIDs.
     C -> S
     """
-    _header_format = '!8s8sL'
+    _header_format = '!8s8s8sL'
 
-    def _encode(self, tid, undone_tid, oid_list):
+    def _encode(self, tid, ltid, undone_tid, oid_list):
         body = StringIO()
         write = body.write
-        write(pack(self._header_format, tid, undone_tid, len(oid_list)))
+        write(pack(self._header_format, tid, ltid, undone_tid, len(oid_list)))
         for oid in oid_list:
             write(oid)
         return body.getvalue()
@@ -1520,10 +1520,10 @@ class AskObjectUndoSerial(Packet):
     def _decode(self, body):
         body = StringIO(body)
         read = body.read
-        tid, undone_tid, oid_list_len = unpack(self._header_format,
+        tid, ltid, undone_tid, oid_list_len = unpack(self._header_format,
             read(self._header_len))
         oid_list = [read(8) for _ in xrange(oid_list_len)]
-        return tid, undone_tid, oid_list
+        return tid, ltid, undone_tid, oid_list
 
 class AnswerObjectUndoSerial(Packet):
     """

Modified: trunk/neo/storage/database/manager.py
==============================================================================
--- trunk/neo/storage/database/manager.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/manager.py [iso-8859-1] Tue Dec 14 16:56:53 2010
@@ -324,12 +324,15 @@ class DatabaseManager(object):
         """
         raise NotImplementedError
 
-    def findUndoTID(self, oid, tid, undone_tid, transaction_object):
+    def findUndoTID(self, oid, tid, ltid, undone_tid, transaction_object):
         """
         oid
             Object OID
         tid
             Transation doing the undo
+        ltid
+            Upper (exclued) bound of transactions visible to transaction doing
+            the undo.
         undone_tid
             Transaction to undo
         transaction_object
@@ -355,16 +358,17 @@ class DatabaseManager(object):
         p64 = util.p64
         oid = u64(oid)
         tid = u64(tid)
+        ltid = u64(ltid)
         undone_tid = u64(undone_tid)
         _getDataTID = self._getDataTID
         if transaction_object is not None:
             toid, tcompression, tchecksum, tdata, tvalue_serial = \
                 transaction_object
             current_tid, current_data_tid = self._getDataTIDFromData(oid,
-                (tid, None, tcompression, tchecksum, tdata,
+                (ltid, None, tcompression, tchecksum, tdata,
                 u64(tvalue_serial)))
         else:
-            current_tid, current_data_tid = _getDataTID(oid, before_tid=tid)
+            current_tid, current_data_tid = _getDataTID(oid, before_tid=ltid)
         if current_tid is None:
             return (None, None, False)
         found_undone_tid, undone_data_tid = _getDataTID(oid, tid=undone_tid)

Modified: trunk/neo/storage/handlers/client.py
==============================================================================
--- trunk/neo/storage/handlers/client.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/client.py [iso-8859-1] Tue Dec 14 16:56:53 2010
@@ -102,14 +102,14 @@ class ClientOperationHandler(BaseClientA
                              app.pt.getPartitions(), partition_list)
         conn.answer(Packets.AnswerTIDs(tid_list))
 
-    def askObjectUndoSerial(self, conn, tid, undone_tid, oid_list):
+    def askObjectUndoSerial(self, conn, tid, ltid, undone_tid, oid_list):
         app = self.app
         findUndoTID = app.dm.findUndoTID
         getObjectFromTransaction = app.tm.getObjectFromTransaction
         object_tid_dict = {}
         for oid in oid_list:
             current_serial, undo_serial, is_current = findUndoTID(oid, tid,
-                undone_tid, getObjectFromTransaction(tid, oid))
+                ltid, undone_tid, getObjectFromTransaction(tid, oid))
             if current_serial is None:
                 p = Errors.OidNotFound(dump(oid))
                 break

Modified: trunk/neo/tests/client/testClientApp.py
==============================================================================
--- trunk/neo/tests/client/testClientApp.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testClientApp.py [iso-8859-1] Tue Dec 14 16:56:53 2010
@@ -206,6 +206,7 @@ class ClientApplicationTests(NeoUnitTest
         oid = self.makeOID()
         tid1 = self.makeTID(1)
         tid2 = self.makeTID(2)
+        snapshot_tid = self.makeTID(3)
         an_object = (1, oid, tid1, tid2, 0, makeChecksum('OBJ'), 'OBJ', None)
         # connection to SN close
         self.assertTrue((oid, tid1) not in mq)
@@ -221,7 +222,7 @@ class ClientApplicationTests(NeoUnitTest
         app.pt = Mock({ 'getCellListForOID': [cell, ], })
         app.cp = Mock({ 'getConnForCell' : conn})
         Application._waitMessage = self._waitMessage
-        self.assertRaises(NEOStorageError, app.load, oid)
+        self.assertRaises(NEOStorageError, app.load, snapshot_tid, oid)
         self.checkAskObject(conn)
         Application._waitMessage = _waitMessage
         # object not found in NEO -> NEOStorageNotFoundError
@@ -236,7 +237,7 @@ class ClientApplicationTests(NeoUnitTest
         })
         app.pt = Mock({ 'getCellListForOID': [cell, ], })
         app.cp = Mock({ 'getConnForCell' : conn})
-        self.assertRaises(NEOStorageNotFoundError, app.load, oid)
+        self.assertRaises(NEOStorageNotFoundError, app.load, snapshot_tid, oid)
         self.checkAskObject(conn)
         # object found on storage nodes and put in cache
         packet = Packets.AnswerObject(*an_object[1:])
@@ -253,7 +254,7 @@ class ClientApplicationTests(NeoUnitTest
             'getNextId': 1,
             'fakeReceived': answer_barrier,
         })
-        result = app.load(oid)
+        result = app.load(snapshot_tid, oid)[:2]
         self.assertEquals(result, ('OBJ', tid1))
         self.checkAskObject(conn)
         self.assertTrue((oid, tid1) in mq)
@@ -262,7 +263,7 @@ class ClientApplicationTests(NeoUnitTest
             'getAddress': ('127.0.0.1', 0),
         })
         app.cp = Mock({ 'getConnForCell' : conn})
-        result = app.load(oid)
+        result = app.load(snapshot_tid, oid)[:2]
         self.assertEquals(result, ('OBJ', tid1))
         self.checkNoPacketSent(conn)
 
@@ -273,6 +274,9 @@ class ClientApplicationTests(NeoUnitTest
         oid = self.makeOID()
         tid1 = self.makeTID(1)
         tid2 = self.makeTID(2)
+        snapshot_tid = self.makeTID(3)
+        def loadSerial(oid, serial):
+            return app.load(snapshot_tid, oid, serial=serial)[0]
         # object not found in NEO -> NEOStorageNotFoundError
         self.assertTrue((oid, tid1) not in mq)
         self.assertTrue((oid, tid2) not in mq)
@@ -285,7 +289,7 @@ class ClientApplicationTests(NeoUnitTest
         })
         app.pt = Mock({ 'getCellListForOID': [cell, ], })
         app.cp = Mock({ 'getConnForCell' : conn})
-        self.assertRaises(NEOStorageNotFoundError, app.loadSerial, oid, tid2)
+        self.assertRaises(NEOStorageNotFoundError, loadSerial, oid, tid2)
         self.checkAskObject(conn)
         # object should not have been cached
         self.assertFalse((oid, tid2) in mq)
@@ -302,7 +306,7 @@ class ClientApplicationTests(NeoUnitTest
         })
         app.cp = Mock({ 'getConnForCell' : conn})
         app.local_var.asked_object = another_object[:-1]
-        result = app.loadSerial(oid, tid1)
+        result = loadSerial(oid, tid1)
         self.assertEquals(result, 'RIGHT')
         self.checkAskObject(conn)
         self.assertTrue((oid, tid2) in mq)
@@ -315,6 +319,9 @@ class ClientApplicationTests(NeoUnitTest
         tid1 = self.makeTID(1)
         tid2 = self.makeTID(2)
         tid3 = self.makeTID(3)
+        snapshot_tid = self.makeTID(4)
+        def loadBefore(oid, tid):
+            return app.load(snapshot_tid, oid, tid=tid)
         # object not found in NEO -> NEOStorageDoesNotExistError
         self.assertTrue((oid, tid1) not in mq)
         self.assertTrue((oid, tid2) not in mq)
@@ -327,7 +334,7 @@ class ClientApplicationTests(NeoUnitTest
         })
         app.pt = Mock({ 'getCellListForOID': [cell, ], })
         app.cp = Mock({ 'getConnForCell' : conn})
-        self.assertRaises(NEOStorageDoesNotExistError, app.loadBefore, oid, tid2)
+        self.assertRaises(NEOStorageDoesNotExistError, loadBefore, oid, tid2)
         self.checkAskObject(conn)
         # no visible version -> NEOStorageNotFoundError
         an_object = (1, oid, INVALID_SERIAL, None, 0, 0, '', None)
@@ -339,7 +346,7 @@ class ClientApplicationTests(NeoUnitTest
         })
         app.cp = Mock({ 'getConnForCell' : conn})
         app.local_var.asked_object = an_object[:-1]
-        self.assertRaises(NEOStorageError, app.loadBefore, oid, tid1)
+        self.assertRaises(NEOStorageError, loadBefore, oid, tid1)
         # object should not have been cached
         self.assertFalse((oid, tid1) in mq)
         # as for loadSerial, the object is cached but should be loaded from db
@@ -356,7 +363,7 @@ class ClientApplicationTests(NeoUnitTest
         })
         app.cp = Mock({ 'getConnForCell' : conn})
         app.local_var.asked_object = another_object
-        result = app.loadBefore(oid, tid3)
+        result = loadBefore(oid, tid3)
         self.assertEquals(result, ('RIGHT', tid2, tid3))
         self.checkAskObject(conn)
         self.assertTrue((oid, tid1) in mq)
@@ -765,6 +772,7 @@ class ClientApplicationTests(NeoUnitTest
         # invalid transaction
         app = self.getApp()
         tid = self.makeTID()
+        snapshot_tid = self.getNextTID()
         txn = self.makeTransactionObject()
         marker = []
         def tryToResolveConflict(oid, conflict_serial, serial, data):
@@ -774,8 +782,8 @@ class ClientApplicationTests(NeoUnitTest
         self.assertFalse(app.local_var.txn is txn)
         conn = Mock()
         cell = Mock()
-        self.assertRaises(StorageTransactionError, app.undo, tid, txn,
-            tryToResolveConflict)
+        self.assertRaises(StorageTransactionError, app.undo, snapshot_tid, tid,
+            txn, tryToResolveConflict)
         # no packet sent
         self.checkNoPacketSent(conn)
         self.checkNoPacketSent(app.master_conn)
@@ -808,10 +816,10 @@ class ClientApplicationTests(NeoUnitTest
             def pending(self, queue): 
                 return not queue.empty()
         app.dispatcher = Dispatcher()
-        def loadSerial(oid, tid):
+        def load(snapshot_tid, oid, serial):
             self.assertEqual(oid, oid0)
-            return {tid0: 'dummy', tid2: 'cdummy'}[tid]
-        app.loadSerial = loadSerial
+            return ({tid0: 'dummy', tid2: 'cdummy'}[serial], None, None)
+        app.load = load
         store_marker = []
         def _store(oid, serial, data, data_serial=None):
             store_marker.append((oid, serial, data, data_serial))
@@ -832,6 +840,7 @@ class ClientApplicationTests(NeoUnitTest
         tid1 = self.getNextTID()
         tid2 = self.getNextTID()
         tid3 = self.getNextTID()
+        snapshot_tid = self.getNextTID()
         app, conn, store_marker = self._getAppForUndoTests(oid0, tid0, tid1,
             tid2)
         undo_serial = Packets.AnswerObjectUndoSerial({
@@ -845,7 +854,7 @@ class ClientApplicationTests(NeoUnitTest
             return 'solved'
         # The undo
         txn = self.beginTransaction(app, tid=tid3)
-        app.undo(tid1, txn, tryToResolveConflict)
+        app.undo(snapshot_tid, tid1, txn, tryToResolveConflict)
         # Checking what happened
         moid, mconflict_serial, mserial, mdata, mcommittedData = marker[0]
         self.assertEqual(moid, oid0)
@@ -872,6 +881,7 @@ class ClientApplicationTests(NeoUnitTest
         tid1 = self.getNextTID()
         tid2 = self.getNextTID()
         tid3 = self.getNextTID()
+        snapshot_tid = self.getNextTID()
         undo_serial = Packets.AnswerObjectUndoSerial({
             oid0: (tid2, tid0, False)})
         undo_serial.setId(2)
@@ -885,7 +895,8 @@ class ClientApplicationTests(NeoUnitTest
             return None
         # The undo
         txn = self.beginTransaction(app, tid=tid3)
-        self.assertRaises(UndoError, app.undo, tid1, txn, tryToResolveConflict)
+        self.assertRaises(UndoError, app.undo, snapshot_tid, tid1, txn,
+            tryToResolveConflict)
         # Checking what happened
         moid, mconflict_serial, mserial, mdata, mcommittedData = marker[0]
         self.assertEqual(moid, oid0)
@@ -903,7 +914,8 @@ class ClientApplicationTests(NeoUnitTest
             raise ConflictError
         # The undo
         app.local_var.queue.put((conn, undo_serial))
-        self.assertRaises(UndoError, app.undo, tid1, txn, tryToResolveConflict)
+        self.assertRaises(UndoError, app.undo, snapshot_tid, tid1, txn,
+            tryToResolveConflict)
         # Checking what happened
         moid, mconflict_serial, mserial, mdata, mcommittedData = marker[0]
         self.assertEqual(moid, oid0)
@@ -925,6 +937,7 @@ class ClientApplicationTests(NeoUnitTest
         tid1 = self.getNextTID()
         tid2 = self.getNextTID()
         tid3 = self.getNextTID()
+        snapshot_tid = self.getNextTID()
         transaction_info = Packets.AnswerTransactionInformation(tid1, '', '',
             '', False, (oid0, ))
         transaction_info.setId(1)
@@ -940,7 +953,7 @@ class ClientApplicationTests(NeoUnitTest
                 'is no conflict in this test !'
         # The undo
         txn = self.beginTransaction(app, tid=tid3)
-        app.undo(tid1, txn, tryToResolveConflict)
+        app.undo(snapshot_tid, tid1, txn, tryToResolveConflict)
         # Checking what happened
         moid, mserial, mdata, mdata_serial = store_marker[0]
         self.assertEqual(moid, oid0)

Modified: trunk/neo/tests/functional/testClient.py
==============================================================================
--- trunk/neo/tests/functional/testClient.py [iso-8859-1] (original)
+++ trunk/neo/tests/functional/testClient.py [iso-8859-1] Tue Dec 14 16:56:53 2010
@@ -145,10 +145,14 @@ class ClientTests(NEOFunctionalTest):
         """ Check transaction isolation within zope connection """
         self.__setup()
         t, c = self.makeTransaction()
-        c.root()['item'] = 0
+        root = c.root()
+        root['item'] = 0
+        root['other'] = 'bla'
         t.commit()
         t1, c1 = self.makeTransaction()
         t2, c2 = self.makeTransaction()
+        # Makes c2 take a snapshot of database state
+        c2.root()['other']
         c1.root()['item'] = 1
         t1.commit()
         # load objet from zope cache
@@ -159,10 +163,14 @@ class ClientTests(NEOFunctionalTest):
         """ Check isolation with zope cache cleared """
         self.__setup()
         t, c = self.makeTransaction()
-        c.root()['item'] = 0
+        root = c.root()
+        root['item'] = 0
+        root['other'] = 'bla'
         t.commit()
         t1, c1 = self.makeTransaction()
         t2, c2 = self.makeTransaction()
+        # Makes c2 take a snapshot of database state
+        c2.root()['other']
         c1.root()['item'] = 1
         t1.commit()
         # clear zope cache to force re-ask NEO

Modified: trunk/neo/tests/storage/testClientHandler.py
==============================================================================
--- trunk/neo/tests/storage/testClientHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testClientHandler.py [iso-8859-1] Tue Dec 14 16:56:53 2010
@@ -271,6 +271,7 @@ class StorageClientHandlerTests(NeoUnitT
         uuid = self.getNewUUID()
         conn = self._getConnection(uuid=uuid)
         tid = self.getNextTID()
+        ltid = self.getNextTID()
         undone_tid = self.getNextTID()
         # Keep 2 entries here, so we check findUndoTID is called only once.
         oid_list = [self.getOID(1), self.getOID(2)]
@@ -281,7 +282,7 @@ class StorageClientHandlerTests(NeoUnitT
         self.app.dm = Mock({
             'findUndoTID': ReturnValues((None, None, False), )
         })
-        self.operation.askObjectUndoSerial(conn, tid, undone_tid, oid_list)
+        self.operation.askObjectUndoSerial(conn, tid, ltid, undone_tid, oid_list)
         self.checkErrorPacket(conn)
 
     def test_askHasLock(self):

Modified: trunk/neo/tests/storage/testStorageDBTests.py
==============================================================================
--- trunk/neo/tests/storage/testStorageDBTests.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testStorageDBTests.py [iso-8859-1] Tue Dec 14 16:56:53 2010
@@ -689,6 +689,7 @@ class StorageDBTests(NeoUnitTestBase):
         tid2 = self.getNextTID()
         tid3 = self.getNextTID()
         tid4 = self.getNextTID()
+        tid5 = self.getNextTID()
         oid1 = self.getOID(1)
         db.storeTransaction(
             tid1, (
@@ -699,7 +700,7 @@ class StorageDBTests(NeoUnitTestBase):
         # Result: current tid is tid1, data_tid is None (undoing object
         # creation)
         self.assertEqual(
-            db.findUndoTID(oid1, tid4, tid1, None),
+            db.findUndoTID(oid1, tid5, tid4, tid1, None),
             (tid1, None, True))
 
         # Store a new transaction
@@ -711,13 +712,13 @@ class StorageDBTests(NeoUnitTestBase):
         # Undoing oid1 tid2, OK: tid2 is latest
         # Result: current tid is tid2, data_tid is tid1
         self.assertEqual(
-            db.findUndoTID(oid1, tid4, tid2, None),
+            db.findUndoTID(oid1, tid5, tid4, tid2, None),
             (tid2, tid1, True))
 
         # Undoing oid1 tid1, Error: tid2 is latest
         # Result: current tid is tid2, data_tid is -1
         self.assertEqual(
-            db.findUndoTID(oid1, tid4, tid1, None),
+            db.findUndoTID(oid1, tid5, tid4, tid1, None),
             (tid2, None, False))
 
         # Undoing oid1 tid1 with tid2 being undone in same transaction,
@@ -727,7 +728,7 @@ class StorageDBTests(NeoUnitTestBase):
         # Explanation of transaction_object: oid1, no data but a data serial
         # to tid1
         self.assertEqual(
-            db.findUndoTID(oid1, tid4, tid1,
+            db.findUndoTID(oid1, tid5, tid4, tid1,
                 (u64(oid1), None, None, None, tid1)),
             (tid4, None, True))
 
@@ -741,7 +742,7 @@ class StorageDBTests(NeoUnitTestBase):
         # Result: current tid is tid2, data_tid is None (undoing object
         # creation)
         self.assertEqual(
-            db.findUndoTID(oid1, tid4, tid1, None),
+            db.findUndoTID(oid1, tid5, tid4, tid1, None),
             (tid3, None, True))
 
 if __name__ == "__main__":

Modified: trunk/neo/tests/testProtocol.py
==============================================================================
--- trunk/neo/tests/testProtocol.py [iso-8859-1] (original)
+++ trunk/neo/tests/testProtocol.py [iso-8859-1] Tue Dec 14 16:56:53 2010
@@ -466,11 +466,13 @@ class ProtocolTests(NeoUnitTestBase):
 
     def test_askObjectUndoSerial(self):
         tid = self.getNextTID()
+        ltid = self.getNextTID()
         undone_tid = self.getNextTID()
         oid_list = [self.getOID(x) for x in xrange(4)]
-        p = Packets.AskObjectUndoSerial(tid, undone_tid, oid_list)
-        ptid, pundone_tid, poid_list = p.decode()
+        p = Packets.AskObjectUndoSerial(tid, ltid, undone_tid, oid_list)
+        ptid, pltid, pundone_tid, poid_list = p.decode()
         self.assertEqual(tid, ptid)
+        self.assertEqual(ltid, pltid)
         self.assertEqual(undone_tid, pundone_tid)
         self.assertEqual(oid_list, poid_list)
 




More information about the Neo-report mailing list