[Neo-report] r2812 jm - in /trunk/neo: client/ tests/functional/
nobody at svn.erp5.org
nobody at svn.erp5.org
Thu Sep 1 18:00:30 CEST 2011
Author: jm
Date: Thu Sep 1 18:00:30 2011
New Revision: 2812
Log:
client: fix processing of invalidations older than snapshot tid
Also patch ZODB to fix an invalidation bug.
Modified:
trunk/neo/client/Storage.py
trunk/neo/client/__init__.py
trunk/neo/tests/functional/__init__.py
Modified: trunk/neo/client/Storage.py
==============================================================================
--- trunk/neo/client/Storage.py [iso-8859-1] (original)
+++ trunk/neo/client/Storage.py [iso-8859-1] Thu Sep 1 18:00:30 2011
@@ -39,6 +39,15 @@ class Storage(BaseStorage.BaseStorage,
ConflictResolution.ConflictResolvingStorage):
"""Wrapper class for neoclient."""
+ # Stores the highest TID visible for current transaction.
+ # First call sets this snapshot by asking master node most recent
+ # committed TID.
+ # As a (positive) side-effect, this forces us to handle all pending
+ # invalidations, so we get a very recent view of the database (which is
+ # good when multiple databases are used in the same program with some
+ # amount of referential integrity).
+ # Should remain None when not bound to a connection,
+ # so that it always read the last revision.
_snapshot_tid = None
implements(*filter(None, (
@@ -70,8 +79,6 @@ class Storage(BaseStorage.BaseStorage,
self._is_read_only = read_only
if _app is None:
_app = Application(master_nodes, name, compress=compress)
- # always read the last revision when not bound to a connection
- self._getSnapshotTID = lambda: None
self.app = _app
# Used to clone self (see new_instance & IMVCCStorage definition).
self._init_args = (master_nodes, name)
@@ -87,34 +94,13 @@ class Storage(BaseStorage.BaseStorage,
def _cache(self):
return self.app._cache
- def _getSnapshotTID(self):
- """
- Get the highest TID visible for current transaction.
- First call sets this snapshot by asking master node most recent
- committed TID.
- As a (positive) side-effect, this forces us to handle all pending
- invalidations, so we get a very recent view of the database (which is
- good when multiple databases are used in the same program with some
- amount of referential integrity).
- """
- tid = self._snapshot_tid
- if tid is None:
- tid = self.lastTransaction()
- if tid is ZERO_TID:
- raise NEOStorageDoesNotExistError('No transaction in storage')
- # Increment by one, as we will use this as an excluded upper
- # bound (loadBefore).
- tid = add64(tid, 1)
- self._snapshot_tid = tid
- return tid
-
def load(self, oid, version=''):
# XXX: interface definition states that version parameter is
# mandatory, while some ZODB tests do not provide it. For now, make
# it optional.
assert version == '', 'Versions are not supported'
try:
- return self.app.load(oid, None, self._getSnapshotTID())[:2]
+ return self.app.load(oid, None, self._snapshot_tid)[:2]
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
@@ -181,7 +167,7 @@ class Storage(BaseStorage.BaseStorage,
# undo
@check_read_only
def undo(self, transaction_id, txn):
- return self.app.undo(self._getSnapshotTID(), undone_tid=transaction_id,
+ return self.app.undo(self._snapshot_tid, undone_tid=transaction_id,
txn=txn, tryToResolveConflict=self.tryToResolveConflict)
@@ -205,7 +191,7 @@ class Storage(BaseStorage.BaseStorage,
def loadEx(self, oid, version):
try:
- data, serial, _ = self.app.load(oid, None, self._getSnapshotTID())
+ data, serial, _ = self.app.load(oid, None, self._snapshot_tid)
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
return data, serial, ''
@@ -223,7 +209,12 @@ class Storage(BaseStorage.BaseStorage,
raise KeyError
def sync(self, force=True):
- self._snapshot_tid = None
+ # XXX: Unfortunately, we're quite slow (lastTransaction) and
+ # we're also called at the end of each transaction by ZODB
+ # (see Connection.afterCompletion), probably for no useful reason.
+ # Increment by one, as we will use this as an excluded upper
+ # bound (loadBefore).
+ self._snapshot_tid = add64(self.lastTransaction(), 1)
def copyTransactionsFrom(self, source, verbose=False):
""" Zope compliant API """
Modified: trunk/neo/client/__init__.py
==============================================================================
--- trunk/neo/client/__init__.py [iso-8859-1] (original)
+++ trunk/neo/client/__init__.py [iso-8859-1] Thu Sep 1 18:00:30 2011
@@ -50,6 +50,45 @@ if needs_patch:
Connection.tpc_finish = tpc_finish
+ try:
+ if Connection._nexedi_fix != 1:
+ raise Exception("A different ZODB fix is already applied")
+ except AttributeError:
+ Connection._nexedi_fix = 1
+
+ # Whenever an connection is opened (and there's usually an existing one
+ # in DB pool that can be reused) whereas the transaction is already
+ # started, we must make sure that proper storage setup is done by
+ # calling Connection.newTransaction.
+ # For example, there's no open transaction when a ZPublisher/Publish
+ # transaction begins.
+
+ def open(self, *args, **kw):
+ def _flush_invalidations():
+ acquire = self._db._a
+ try:
+ self._db._r()
+ except thread.error:
+ acquire = lambda: None
+ try:
+ del self._flush_invalidations
+ self.newTransaction()
+ finally:
+ acquire()
+ self._flush_invalidations = _flush_invalidations
+ self._flush_invalidations = _flush_invalidations
+ try:
+ Connection_open(self, *args, **kw)
+ finally:
+ del self._flush_invalidations
+ try:
+ Connection_open = Connection._setDB
+ Connection._setDB = open
+ except AttributeError: # recent ZODB
+ Connection_open = Connection.open
+ Connection.open = open
+
+
class _DB(object):
"""
Wrapper to DB instance that properly initialize Connection objects
Modified: trunk/neo/tests/functional/__init__.py
==============================================================================
--- trunk/neo/tests/functional/__init__.py [iso-8859-1] (original)
+++ trunk/neo/tests/functional/__init__.py [iso-8859-1] Thu Sep 1 18:00:30 2011
@@ -30,6 +30,7 @@ import tempfile
import traceback
import threading
import psutil
+import transaction
import neo.scripts
from neo.neoctl.neoctl import NeoCTL, NotReadyException
@@ -654,6 +655,13 @@ class NEOCluster(object):
class NEOFunctionalTest(NeoTestBase):
+ def tearDown(self):
+ # Kill all unfinished transactions for next test.
+ # Note we don't even abort them because it may require a valid
+ # connection to a master node (see Storage.sync()).
+ transaction.manager.__init__()
+ NeoTestBase.tearDown(self)
+
def setupLog(self):
log_file = os.path.join(self.getTempDirectory(), 'test.log')
setupLog('TEST', log_file, True)
More information about the Neo-report
mailing list