[Neo-report] r1780 gregory - /trunk/neo/tests/zodb/testZODB.py

nobody at svn.erp5.org nobody at svn.erp5.org
Tue Feb 16 17:42:06 CET 2010


Author: gregory
Date: Tue Feb 16 17:42:05 2010
New Revision: 1780

Log:
Remove ZODB test module copy, use it from imports.

Modified:
    trunk/neo/tests/zodb/testZODB.py

Modified: trunk/neo/tests/zodb/testZODB.py
==============================================================================
--- trunk/neo/tests/zodb/testZODB.py [iso-8859-1] (original)
+++ trunk/neo/tests/zodb/testZODB.py [iso-8859-1] Tue Feb 16 17:42:05 2010
@@ -1,715 +1,28 @@
-##############################################################################
 #
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
-# All Rights Reserved.
+# Copyright (C) 2009-2010  Nexedi SA
 #
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
 #
-##############################################################################
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+
 import unittest
-
-import ZODB
-import ZODB.FileStorage
-from ZODB.POSException import ReadConflictError
-from ZODB.POSException import TransactionFailedError
-from ZODB.tests.warnhook import WarningsHook
-
-from persistent import Persistent
-from persistent.mapping import PersistentMapping
-import transaction
+from ZODB.tests.testZODB import ZODBTests
 
 from neo.tests.zodb import ZODBTestCase
 
-
-class P(Persistent):
+class NEOZODBTests(ZODBTestCase, ZODBTests):
     pass
 
-class Independent(Persistent):
-
-    def _p_independent(self):
-        return 1
-
-class DecoyIndependent(Persistent):
-
-    def _p_independent(self):
-        return 0
-
-class ZODBTests(ZODBTestCase):
-
-    def populate(self):
-        transaction.begin()
-        conn = self._db.open()
-        root = conn.root()
-        root['test'] = pm = PersistentMapping()
-        for n in range(100):
-            pm[n] = PersistentMapping({0: 100 - n})
-        transaction.get().note('created test data')
-        transaction.commit()
-        conn.close()
-
-    def checkExportImport(self, abort_it=False):
-        self.populate()
-        conn = self._db.open()
-        try:
-            self.duplicate(conn, abort_it)
-        finally:
-            conn.close()
-        conn = self._db.open()
-        try:
-            self.verify(conn, abort_it)
-        finally:
-            conn.close()
-
-    def duplicate(self, conn, abort_it):
-        transaction.begin()
-        transaction.get().note('duplication')
-        root = conn.root()
-        ob = root['test']
-        assert len(ob) > 10, 'Insufficient test data'
-        try:
-            import tempfile
-            f = tempfile.TemporaryFile()
-            ob._p_jar.exportFile(ob._p_oid, f)
-            assert f.tell() > 0, 'Did not export correctly'
-            f.seek(0)
-            new_ob = ob._p_jar.importFile(f)
-            self.assertEqual(new_ob, ob)
-            root['dup'] = new_ob
-            f.close()
-            if abort_it:
-                transaction.abort()
-            else:
-                transaction.commit()
-        except:
-            transaction.abort()
-            raise
-
-    def verify(self, conn, abort_it):
-        transaction.begin()
-        root = conn.root()
-        ob = root['test']
-        try:
-            ob2 = root['dup']
-        except KeyError:
-            if abort_it:
-                # Passed the test.
-                return
-            else:
-                raise
-        else:
-            self.assertTrue(not abort_it, 'Did not abort duplication')
-        l1 = list(ob.items())
-        l1.sort()
-        l2 = list(ob2.items())
-        l2.sort()
-        l1 = map(lambda (k, v): (k, v[0]), l1)
-        l2 = map(lambda (k, v): (k, v[0]), l2)
-        self.assertEqual(l1, l2)
-        self.assert_(ob._p_oid != ob2._p_oid)
-        self.assertEqual(ob._p_jar, ob2._p_jar)
-        oids = {}
-        for v in ob.values():
-            oids[v._p_oid] = 1
-        for v in ob2.values():
-            assert not oids.has_key(v._p_oid), (
-                'Did not fully separate duplicate from original')
-        transaction.commit()
-
-    def checkExportImportAborted(self):
-        self.checkExportImport(abort_it=True)
-
-    def checkVersionOnly(self):
-        # Make sure the changes to make empty transactions a no-op
-        # still allow things like abortVersion().  This should work
-        # because abortVersion() calls tpc_begin() itself.
-        conn = self._db.open("version")
-        try:
-            r = conn.root()
-            r[1] = 1
-            transaction.commit()
-        finally:
-            conn.close()
-        self._db.abortVersion("version")
-        transaction.commit()
-
-    def checkResetCache(self):
-        # The cache size after a reset should be 0.  Note that
-        # _resetCache is not a public API, but the resetCaches()
-        # function is, and resetCaches() causes _resetCache() to be
-        # called.
-        self.populate()
-        conn = self._db.open()
-        conn.root()
-        self.assert_(len(conn._cache) > 0)  # Precondition
-        conn._resetCache()
-        self.assertEqual(len(conn._cache), 0)
-
-    def checkResetCachesAPI(self):
-        # Checks the resetCaches() API.
-        # (resetCaches used to be called updateCodeTimestamp.)
-        self.populate()
-        conn = self._db.open()
-        conn.root()
-        self.assert_(len(conn._cache) > 0)  # Precondition
-        ZODB.Connection.resetCaches()
-        conn.close()
-        self.assert_(len(conn._cache) > 0)  # Still not flushed
-        conn._setDB(self._db)  # simulate the connection being reopened
-        self.assertEqual(len(conn._cache), 0)
-
-    def checkExplicitTransactionManager(self):
-        # Test of transactions that apply to only the connection,
-        # not the thread.
-        tm1 = transaction.TransactionManager()
-        conn1 = self._db.open(transaction_manager=tm1)
-        tm2 = transaction.TransactionManager()
-        conn2 = self._db.open(transaction_manager=tm2)
-        try:
-            r1 = conn1.root()
-            r2 = conn2.root()
-            if r1.has_key('item'):
-                del r1['item']
-                tm1.get().commit()
-            r1.get('item')
-            r2.get('item')
-            r1['item'] = 1
-            tm1.get().commit()
-            self.assertEqual(r1['item'], 1)
-            # r2 has not seen a transaction boundary,
-            # so it should be unchanged.
-            self.assertEqual(r2.get('item'), None)
-            conn2.sync()
-            # Now r2 is updated.
-            self.assertEqual(r2['item'], 1)
-
-            # Now, for good measure, send an update in the other direction.
-            r2['item'] = 2
-            tm2.get().commit()
-            self.assertEqual(r1['item'], 1)
-            self.assertEqual(r2['item'], 2)
-            conn1.sync()
-            conn2.sync()
-            self.assertEqual(r1['item'], 2)
-            self.assertEqual(r2['item'], 2)
-        finally:
-            conn1.close()
-            conn2.close()
-
-    def checkLocalTransactions(self):
-        # Test of transactions that apply to only the connection,
-        # not the thread.
-        conn1 = self._db.open()
-        conn2 = self._db.open()
-        hook = WarningsHook()
-        hook.install()
-        try:
-            conn1.setLocalTransaction()
-            conn2.setLocalTransaction()
-            r1 = conn1.root()
-            r2 = conn2.root()
-            if r1.has_key('item'):
-                del r1['item']
-                conn1.getTransaction().commit()
-            r1.get('item')
-            r2.get('item')
-            r1['item'] = 1
-            conn1.getTransaction().commit()
-            self.assertEqual(r1['item'], 1)
-            # r2 has not seen a transaction boundary,
-            # so it should be unchanged.
-            self.assertEqual(r2.get('item'), None)
-            conn2.sync()
-            # Now r2 is updated.
-            self.assertEqual(r2['item'], 1)
-
-            # Now, for good measure, send an update in the other direction.
-            r2['item'] = 2
-            conn2.getTransaction().commit()
-            self.assertEqual(r1['item'], 1)
-            self.assertEqual(r2['item'], 2)
-            conn1.sync()
-            conn2.sync()
-            self.assertEqual(r1['item'], 2)
-            self.assertEqual(r2['item'], 2)
-            for msg, obj, filename, lineno in hook.warnings:
-                self.assert_(msg in [
-                    "This will be removed in ZODB 3.6:\n"
-                        "setLocalTransaction() is deprecated. "
-                        "Use the transaction_manager argument "
-                        "to DB.open() instead.",
-                    "This will be removed in ZODB 3.6:\n"
-                        "getTransaction() is deprecated. "
-                        "Use the transaction_manager argument "
-                        "to DB.open() instead, or access "
-                        ".transaction_manager directly on the Connection."])
-        finally:
-            conn1.close()
-            conn2.close()
-            hook.uninstall()
-
-    def checkReadConflict(self):
-        self.obj = P()
-        self.readConflict()
-
-    def readConflict(self, shouldFail=True):
-        # Two transactions run concurrently.  Each reads some object,
-        # then one commits and the other tries to read an object
-        # modified by the first.  This read should fail with a conflict
-        # error because the object state read is not necessarily
-        # consistent with the objects read earlier in the transaction.
-
-        tm1 = transaction.TransactionManager()
-        conn = self._db.open(mvcc=False, transaction_manager=tm1)
-        r1 = conn.root()
-        r1["p"] = self.obj
-        self.obj.child1 = P()
-        tm1.get().commit()
-
-        # start a new transaction with a new connection
-        tm2 = transaction.TransactionManager()
-        cn2 = self._db.open(mvcc=False, transaction_manager=tm2)
-        # start a new transaction with the other connection
-        r2 = cn2.root()
-
-        self.assertEqual(r1._p_serial, r2._p_serial)
-
-        self.obj.child2 = P()
-        tm1.get().commit()
-
-        # resume the transaction using cn2
-        obj = r2["p"]
-        # An attempt to access obj should fail, because r2 was read
-        # earlier in the transaction and obj was modified by the othe
-        # transaction.
-        if shouldFail:
-            self.assertRaises(ReadConflictError, lambda: obj.child1)
-            # And since ReadConflictError was raised, attempting to commit
-            # the transaction should re-raise it.  checkNotIndependent()
-            # failed this part of the test for a long time.
-            self.assertRaises(ReadConflictError, tm2.get().commit)
-
-            # And since that commit failed, trying to commit again should
-            # fail again.
-            self.assertRaises(TransactionFailedError, tm2.get().commit)
-            # And again.
-            self.assertRaises(TransactionFailedError, tm2.get().commit)
-            # Etc.
-            self.assertRaises(TransactionFailedError, tm2.get().commit)
-
-        else:
-            # make sure that accessing the object succeeds
-            obj.child1
-        tm2.get().abort()
-
-    def checkReadConflictIgnored(self):
-        # Test that an application that catches a read conflict and
-        # continues can not commit the transaction later.
-        root = self._db.open(mvcc=False).root()
-        root["real_data"] = real_data = PersistentMapping()
-        root["index"] = index = PersistentMapping()
-
-        real_data["a"] = PersistentMapping({"indexed_value": 0})
-        real_data["b"] = PersistentMapping({"indexed_value": 1})
-        index[1] = PersistentMapping({"b": 1})
-        index[0] = PersistentMapping({"a": 1})
-        transaction.commit()
-
-        # load some objects from one connection
-        tm = transaction.TransactionManager()
-        cn2 = self._db.open(mvcc=False, transaction_manager=tm)
-        r2 = cn2.root()
-        real_data2 = r2["real_data"]
-        index2 = r2["index"]
-
-        real_data["b"]["indexed_value"] = 0
-        del index[1]["b"]
-        index[0]["b"] = 1
-        transaction.commit()
-
-        del real_data2["a"]
-        try:
-            del index2[0]["a"]
-        except ReadConflictError:
-            # This is the crux of the text.  Ignore the error.
-            pass
-        else:
-            self.fail("No conflict occurred")
-
-        # real_data2 still ready to commit
-        self.assert_(real_data2._p_changed)
-
-        # index2 values not ready to commit
-        self.assert_(not index2._p_changed)
-        self.assert_(not index2[0]._p_changed)
-        self.assert_(not index2[1]._p_changed)
-
-        self.assertRaises(ReadConflictError, tm.get().commit)
-        self.assertRaises(TransactionFailedError, tm.get().commit)
-        tm.get().abort()
-
-    def checkIndependent(self):
-        self.obj = Independent()
-        self.readConflict(shouldFail=False)
-
-    def checkNotIndependent(self):
-        self.obj = DecoyIndependent()
-        self.readConflict()
-
-    def checkSubtxnCommitDoesntGetInvalidations(self):
-        # Prior to ZODB 3.2.9 and 3.4, Connection.tpc_finish() processed
-        # invalidations even for a subtxn commit.  This could make
-        # inconsistent state visible after a subtxn commit.  There was a
-        # suspicion that POSKeyError was possible as a result, but I wasn't
-        # able to construct a case where that happened.
-
-        # Set up the database, to hold
-        # root --> "p" -> value = 1
-        #      --> "q" -> value = 2
-        tm1 = transaction.TransactionManager()
-        conn = self._db.open(transaction_manager=tm1)
-        r1 = conn.root()
-        p = P()
-        p.value = 1
-        r1["p"] = p
-        q = P()
-        q.value = 2
-        r1["q"] = q
-        tm1.commit()
-
-        # Now txn T1 changes p.value to 3 locally (subtxn commit).
-        p.value = 3
-        tm1.commit(True)
-
-        # Start new txn T2 with a new connection.
-        tm2 = transaction.TransactionManager()
-        cn2 = self._db.open(transaction_manager=tm2)
-        r2 = cn2.root()
-        p2 = r2["p"]
-        self.assertEqual(p._p_oid, p2._p_oid)
-        # T2 shouldn't see T1's change of p.value to 3, because T1 didn't
-        # commit yet.
-        self.assertEqual(p2.value, 1)
-        # Change p.value to 4, and q.value to 5.  Neither should be visible
-        # to T1, because T1 is still in progress.
-        p2.value = 4
-        q2 = r2["q"]
-        self.assertEqual(q._p_oid, q2._p_oid)
-        self.assertEqual(q2.value, 2)
-        q2.value = 5
-        tm2.commit()
-
-        # Back to T1.  p and q still have the expected values.
-        rt = conn.root()
-        self.assertEqual(rt["p"].value, 3)
-        self.assertEqual(rt["q"].value, 2)
-
-        # Now do another subtxn commit in T1.  This shouldn't change what
-        # T1 sees for p and q.
-        rt["r"] = P()
-        tm1.commit(True)
-
-        # Doing that subtxn commit in T1 should not process invalidations
-        # from T2's commit.  p.value should still be 3 here (because that's
-        # what T1 subtxn-committed earlier), and q.value should still be 2.
-        # Prior to ZODB 3.2.9 and 3.4, q.value was 5 here.
-        rt = conn.root()
-        try:
-            self.assertEqual(rt["p"].value, 3)
-            self.assertEqual(rt["q"].value, 2)
-        finally:
-            tm1.abort()
-
-    def checkReadConflictErrorClearedDuringAbort(self):
-        # When a transaction is aborted, the "memory" of which
-        # objects were the cause of a ReadConflictError during
-        # that transaction should be cleared.
-        root = self._db.open(mvcc=False).root()
-        data = PersistentMapping({'d': 1})
-        root["data"] = data
-        transaction.commit()
-
-        # Provoke a ReadConflictError.
-        tm2 = transaction.TransactionManager()
-        cn2 = self._db.open(mvcc=False, transaction_manager=tm2)
-        r2 = cn2.root()
-        data2 = r2["data"]
-
-        data['d'] = 2
-        transaction.commit()
-
-        try:
-            data2['d'] = 3
-        except ReadConflictError:
-            pass
-        else:
-            self.fail("No conflict occurred")
-
-        # Explicitly abort cn2's transaction.
-        tm2.get().abort()
-
-        # cn2 should retain no memory of the read conflict after an abort(),
-        # but 3.2.3 had a bug wherein it did.
-        data_conflicts = data._p_jar._conflicts
-        data2_conflicts = data2._p_jar._conflicts
-        self.failIf(data_conflicts)
-        self.failIf(data2_conflicts)  # this used to fail
-
-        # And because of that, we still couldn't commit a change to data2['d']
-        # in the new transaction.
-        cn2.sync()  # process the invalidation for data2['d']
-        data2['d'] = 3
-        tm2.get().commit()  # 3.2.3 used to raise ReadConflictError
-
-        cn2.close()
-
-    def checkTxnBeginImpliesAbort(self):
-        # begin() should do an abort() first, if needed.
-        cn = self._db.open()
-        rt = cn.root()
-        rt['a'] = 1
-
-        transaction.begin()  # should abort adding 'a' to the root
-        rt = cn.root()
-        self.assertRaises(KeyError, rt.__getitem__, 'a')
-
-        # A longstanding bug:  this didn't work if changes were only in
-        # subtransactions.
-        transaction.begin()
-        rt = cn.root()
-        rt['a'] = 2
-        transaction.commit(1)
-
-        transaction.begin()
-        rt = cn.root()
-        self.assertRaises(KeyError, rt.__getitem__, 'a')
-
-        # One more time, mixing "top level" and subtransaction changes.
-        transaction.begin()
-        rt = cn.root()
-        rt['a'] = 3
-        transaction.commit(1)
-        rt['b'] = 4
-
-        transaction.begin()
-        rt = cn.root()
-        self.assertRaises(KeyError, rt.__getitem__, 'a')
-        self.assertRaises(KeyError, rt.__getitem__, 'b')
-
-        # That used methods of the default transaction *manager*.  Alas,
-        # that's not necessarily the same as using methods of the current
-        # transaction, and, in fact, when this test was written,
-        # Transaction.begin() didn't do anything (everything from here
-        # down failed).
-
-        # Oh, bleech.  Since Transaction.begin is also deprecated, we have
-        # to goof around suppressing the deprecation warning.
-        import warnings
-
-        # First verify that Transaction.begin *is* deprecated, by turning
-        # the warning into an error.
-        warnings.filterwarnings("error", category=DeprecationWarning)
-        self.assertRaises(DeprecationWarning, transaction.get().begin)
-        del warnings.filters[0]
-
-        # Now ignore DeprecationWarnings for the duration.  Use a
-        # try/finally block to ensure we reenable DeprecationWarnings
-        # no matter what.
-        warnings.filterwarnings("ignore", category=DeprecationWarning)
-        try:
-            cn = self._db.open()
-            rt = cn.root()
-            rt['a'] = 1
-
-            transaction.get().begin()  # should abort adding 'a' to the root
-            rt = cn.root()
-            self.assertRaises(KeyError, rt.__getitem__, 'a')
-
-            # A longstanding bug:  this didn't work if changes were only in
-            # subtransactions.
-            transaction.get().begin()
-            rt = cn.root()
-            rt['a'] = 2
-            transaction.get().commit(1)
-
-            transaction.get().begin()
-            rt = cn.root()
-            self.assertRaises(KeyError, rt.__getitem__, 'a')
-
-            # One more time, mixing "top level" and subtransaction changes.
-            transaction.get().begin()
-            rt = cn.root()
-            rt['a'] = 3
-            transaction.get().commit(1)
-            rt['b'] = 4
-
-            transaction.get().begin()
-            rt = cn.root()
-            self.assertRaises(KeyError, rt.__getitem__, 'a')
-            self.assertRaises(KeyError, rt.__getitem__, 'b')
-
-            cn.close()
-
-        finally:
-            del warnings.filters[0]
-
-    def checkFailingCommitSticks(self):
-        # See also checkFailingSubtransactionCommitSticks.
-        cn = self._db.open()
-        rt = cn.root()
-        rt['a'] = 1
-
-        # Arrange for commit to fail during tpc_vote.
-        poisoned = PoisonedObject(PoisonedJar(break_tpc_vote=True))
-        transaction.get().register(poisoned)
-
-        self.assertRaises(PoisonedError, transaction.get().commit)
-        # Trying to commit again fails too.
-        self.assertRaises(TransactionFailedError, transaction.get().commit)
-        self.assertRaises(TransactionFailedError, transaction.get().commit)
-        self.assertRaises(TransactionFailedError, transaction.get().commit)
-
-        # The change to rt['a'] is lost.
-        self.assertRaises(KeyError, rt.__getitem__, 'a')
-
-        # Trying to modify an object also fails, because Transaction.join()
-        # also raises TransactionFailedError.
-        self.assertRaises(TransactionFailedError, rt.__setitem__, 'b', 2)
-
-        # Clean up via abort(), and try again.
-        transaction.get().abort()
-        rt['a'] = 1
-        transaction.get().commit()
-        self.assertEqual(rt['a'], 1)
-
-        # Cleaning up via begin() should also work.
-        rt['a'] = 2
-        transaction.get().register(poisoned)
-        self.assertRaises(PoisonedError, transaction.get().commit)
-        self.assertRaises(TransactionFailedError, transaction.get().commit)
-        # The change to rt['a'] is lost.
-        self.assertEqual(rt['a'], 1)
-        # Trying to modify an object also fails.
-        self.assertRaises(TransactionFailedError, rt.__setitem__, 'b', 2)
-        # Clean up via begin(), and try again.
-        transaction.begin()
-        rt['a'] = 2
-        transaction.get().commit()
-        self.assertEqual(rt['a'], 2)
-
-        cn.close()
-
-    def checkFailingSubtransactionCommitSticks(self):
-        cn = self._db.open()
-        rt = cn.root()
-        rt['a'] = 1
-        transaction.get().commit(True)
-        self.assertEqual(rt['a'], 1)
-
-        rt['b'] = 2
-
-        # Subtransactions don't do tpc_vote, so we poison tpc_begin.
-        poisoned = PoisonedJar()
-        transaction.get().join(poisoned)
-        poisoned.break_savepoint = True
-        self.assertRaises(PoisonedError, transaction.get().commit, True)
-        # Trying to subtxn-commit again fails too.
-        self.assertRaises(TransactionFailedError,
-                          transaction.get().commit, True)
-        self.assertRaises(TransactionFailedError,
-                          transaction.get().commit, True)
-        # Top-level commit also fails.
-        self.assertRaises(TransactionFailedError, transaction.get().commit)
-
-        # The changes to rt['a'] and rt['b'] are lost.
-        self.assertRaises(KeyError, rt.__getitem__, 'a')
-        self.assertRaises(KeyError, rt.__getitem__, 'b')
-
-        # Trying to modify an object also fails, because Transaction.join()
-        # also raises TransactionFailedError.
-        self.assertRaises(TransactionFailedError, rt.__setitem__, 'b', 2)
-
-
-        # Clean up via abort(), and try again.
-        transaction.get().abort()
-        rt['a'] = 1
-        transaction.get().commit()
-        self.assertEqual(rt['a'], 1)
-
-        # Cleaning up via begin() should also work.
-        rt['a'] = 2
-        poisoned = PoisonedJar()
-        transaction.get().join(poisoned)
-        poisoned.break_savepoint = True
-        self.assertRaises(PoisonedError, transaction.get().commit, True)
-        self.assertRaises(TransactionFailedError,
-                          transaction.get().commit, True)
-
-        # The change to rt['a'] is lost.
-        self.assertEqual(rt['a'], 1)
-        # Trying to modify an object also fails.
-        self.assertRaises(TransactionFailedError, rt.__setitem__, 'b', 2)
-
-        # Clean up via begin(), and try again.
-        transaction.begin()
-        rt['a'] = 2
-        transaction.get().commit(True)
-        self.assertEqual(rt['a'], 2)
-        transaction.get().commit()
-
-        cn2 = self._db.open()
-        rt = cn.root()
-        self.assertEqual(rt['a'], 2)
-
-        cn.close()
-        cn2.close()
-
-class PoisonedError(Exception):
-    pass
-
-# PoisonedJar arranges to raise exceptions from interesting places.
-# For whatever reason, subtransaction commits don't call tpc_vote.
-class PoisonedJar:
-    def __init__(self, break_tpc_begin=False, break_tpc_vote=False,
-                 break_savepoint=False):
-        self.break_tpc_begin = break_tpc_begin
-        self.break_tpc_vote = break_tpc_vote
-        self.break_savepoint = break_savepoint
-
-    def sortKey(self):
-        return str(id(self))
-
-    # A way to poison a subtransaction commit.
-    def tpc_begin(self, *args):
-        if self.break_tpc_begin:
-            raise PoisonedError("tpc_begin fails")
-
-    # A way to poison a top-level commit.
-    def tpc_vote(self, *args):
-        if self.break_tpc_vote:
-            raise PoisonedError("tpc_vote fails")
-
-    def savepoint(self):
-        if self.break_savepoint:
-            raise PoisonedError("savepoint fails")
-
-    def commit(*args):
-        pass
-
-    def abort(*self):
-        pass
-
-
-class PoisonedObject:
-    def __init__(self, poisonedjar):
-        self._p_jar = poisonedjar
-
-def test_suite():
-    return unittest.makeSuite(ZODBTests, 'check')
-
 if __name__ == "__main__":
-    unittest.main(defaultTest="test_suite")
+    suite = unittest.makeSuite(NEOZODBTests, 'check')
+    unittest.TextTestRunner().run(suite)





More information about the Neo-report mailing list