[Neo-report] r2812 jm - in /trunk/neo: client/ tests/functional/

nobody at svn.erp5.org nobody at svn.erp5.org
Thu Sep 1 18:00:30 CEST 2011


Author: jm
Date: Thu Sep  1 18:00:30 2011
New Revision: 2812

Log:
client: fix processing of invalidations older than snapshot tid

Also patch ZODB to fix an invalidation bug.

Modified:
    trunk/neo/client/Storage.py
    trunk/neo/client/__init__.py
    trunk/neo/tests/functional/__init__.py

Modified: trunk/neo/client/Storage.py
==============================================================================
--- trunk/neo/client/Storage.py [iso-8859-1] (original)
+++ trunk/neo/client/Storage.py [iso-8859-1] Thu Sep  1 18:00:30 2011
@@ -39,6 +39,15 @@ class Storage(BaseStorage.BaseStorage,
               ConflictResolution.ConflictResolvingStorage):
     """Wrapper class for neoclient."""
 
+    # Stores the highest TID visible for current transaction.
+    # First call sets this snapshot by asking master node most recent
+    # committed TID.
+    # As a (positive) side-effect, this forces us to handle all pending
+    # invalidations, so we get a very recent view of the database (which is
+    # good when multiple databases are used in the same program with some
+    # amount of referential integrity).
+    # Should remain None when not bound to a connection,
+    # so that it always read the last revision.
     _snapshot_tid = None
 
     implements(*filter(None, (
@@ -70,8 +79,6 @@ class Storage(BaseStorage.BaseStorage,
         self._is_read_only = read_only
         if _app is None:
             _app = Application(master_nodes, name, compress=compress)
-            # always read the last revision when not bound to a connection
-            self._getSnapshotTID = lambda: None
         self.app = _app
         # Used to clone self (see new_instance & IMVCCStorage definition).
         self._init_args = (master_nodes, name)
@@ -87,34 +94,13 @@ class Storage(BaseStorage.BaseStorage,
     def _cache(self):
         return self.app._cache
 
-    def _getSnapshotTID(self):
-        """
-        Get the highest TID visible for current transaction.
-        First call sets this snapshot by asking master node most recent
-        committed TID.
-        As a (positive) side-effect, this forces us to handle all pending
-        invalidations, so we get a very recent view of the database (which is
-        good when multiple databases are used in the same program with some
-        amount of referential integrity).
-        """
-        tid = self._snapshot_tid
-        if tid is None:
-            tid = self.lastTransaction()
-            if tid is ZERO_TID:
-                raise NEOStorageDoesNotExistError('No transaction in storage')
-            # Increment by one, as we will use this as an excluded upper
-            # bound (loadBefore).
-            tid = add64(tid, 1)
-            self._snapshot_tid = tid
-        return tid
-
     def load(self, oid, version=''):
         # XXX: interface definition states that version parameter is
         # mandatory, while some ZODB tests do not provide it. For now, make
         # it optional.
         assert version == '', 'Versions are not supported'
         try:
-            return self.app.load(oid, None, self._getSnapshotTID())[:2]
+            return self.app.load(oid, None, self._snapshot_tid)[:2]
         except NEOStorageNotFoundError:
             raise POSException.POSKeyError(oid)
 
@@ -181,7 +167,7 @@ class Storage(BaseStorage.BaseStorage,
     # undo
     @check_read_only
     def undo(self, transaction_id, txn):
-        return self.app.undo(self._getSnapshotTID(), undone_tid=transaction_id,
+        return self.app.undo(self._snapshot_tid, undone_tid=transaction_id,
             txn=txn, tryToResolveConflict=self.tryToResolveConflict)
 
 
@@ -205,7 +191,7 @@ class Storage(BaseStorage.BaseStorage,
 
     def loadEx(self, oid, version):
         try:
-            data, serial, _ = self.app.load(oid, None, self._getSnapshotTID())
+            data, serial, _ = self.app.load(oid, None, self._snapshot_tid)
         except NEOStorageNotFoundError:
             raise POSException.POSKeyError(oid)
         return data, serial, ''
@@ -223,7 +209,12 @@ class Storage(BaseStorage.BaseStorage,
             raise KeyError
 
     def sync(self, force=True):
-        self._snapshot_tid = None
+        # XXX: Unfortunately, we're quite slow (lastTransaction) and
+        #      we're also called at the end of each transaction by ZODB
+        #      (see Connection.afterCompletion), probably for no useful reason.
+        # Increment by one, as we will use this as an excluded upper
+        # bound (loadBefore).
+        self._snapshot_tid = add64(self.lastTransaction(), 1)
 
     def copyTransactionsFrom(self, source, verbose=False):
         """ Zope compliant API """

Modified: trunk/neo/client/__init__.py
==============================================================================
--- trunk/neo/client/__init__.py [iso-8859-1] (original)
+++ trunk/neo/client/__init__.py [iso-8859-1] Thu Sep  1 18:00:30 2011
@@ -50,6 +50,45 @@ if needs_patch:
 
     Connection.tpc_finish = tpc_finish
 
+    try:
+        if Connection._nexedi_fix != 1:
+            raise Exception("A different ZODB fix is already applied")
+    except AttributeError:
+        Connection._nexedi_fix = 1
+
+        # Whenever an connection is opened (and there's usually an existing one
+        # in DB pool that can be reused) whereas the transaction is already
+        # started, we must make sure that proper storage setup is done by
+        # calling Connection.newTransaction.
+        # For example, there's no open transaction when a ZPublisher/Publish
+        # transaction begins.
+
+        def open(self, *args, **kw):
+            def _flush_invalidations():
+                acquire = self._db._a
+                try:
+                    self._db._r()
+                except thread.error:
+                    acquire = lambda: None
+                try:
+                    del self._flush_invalidations
+                    self.newTransaction()
+                finally:
+                    acquire()
+                    self._flush_invalidations = _flush_invalidations
+            self._flush_invalidations = _flush_invalidations
+            try:
+                Connection_open(self, *args, **kw)
+            finally:
+                del self._flush_invalidations
+        try:
+            Connection_open = Connection._setDB
+            Connection._setDB = open
+        except AttributeError: # recent ZODB
+            Connection_open = Connection.open
+            Connection.open = open
+
+
     class _DB(object):
         """
         Wrapper to DB instance that properly initialize Connection objects

Modified: trunk/neo/tests/functional/__init__.py
==============================================================================
--- trunk/neo/tests/functional/__init__.py [iso-8859-1] (original)
+++ trunk/neo/tests/functional/__init__.py [iso-8859-1] Thu Sep  1 18:00:30 2011
@@ -30,6 +30,7 @@ import tempfile
 import traceback
 import threading
 import psutil
+import transaction
 
 import neo.scripts
 from neo.neoctl.neoctl import NeoCTL, NotReadyException
@@ -654,6 +655,13 @@ class NEOCluster(object):
 
 class NEOFunctionalTest(NeoTestBase):
 
+    def tearDown(self):
+        # Kill all unfinished transactions for next test.
+        # Note we don't even abort them because it may require a valid
+        # connection to a master node (see Storage.sync()).
+        transaction.manager.__init__()
+        NeoTestBase.tearDown(self)
+
     def setupLog(self):
         log_file = os.path.join(self.getTempDirectory(), 'test.log')
         setupLog('TEST', log_file, True)




More information about the Neo-report mailing list