[Neo-report] r2465 vincent - in /trunk/neo: storage/database/ storage/handlers/ tests/stor...

nobody at svn.erp5.org nobody at svn.erp5.org
Wed Nov 17 14:05:28 CET 2010


Author: vincent
Date: Wed Nov 17 14:05:28 2010
New Revision: 2465

Log:
Add support for complete deletion of partition tail in replication.

This can happen when enough most-recent objects have been transactionally
un-created and database was packed.

Modified:
    trunk/neo/storage/database/btree.py
    trunk/neo/storage/database/manager.py
    trunk/neo/storage/database/mysqldb.py
    trunk/neo/storage/handlers/replication.py
    trunk/neo/tests/storage/testReplicationHandler.py
    trunk/neo/tests/storage/testStorageDBTests.py

Modified: trunk/neo/storage/database/btree.py
==============================================================================
--- trunk/neo/storage/database/btree.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/btree.py [iso-8859-1] Wed Nov 17 14:05:28 2010
@@ -435,6 +435,13 @@ class BTreeDatabaseManager(DatabaseManag
         except KeyError:
             pass
 
+    def deleteTransactionsAbove(self, num_partitions, partition, tid):
+        tid = util.u64(tid)
+        def same_partition(key, _):
+            return key % num_partitions == partition
+        batchDelete(self._trans, same_partition,
+            iter_kw={'min': tid, 'excludemin': True})
+
     def deleteObject(self, oid, serial=None):
         u64 = util.u64
         oid = u64(oid)
@@ -458,6 +465,24 @@ class BTreeDatabaseManager(DatabaseManag
                 prune(obj[oid])
                 del obj[oid]
 
+    def deleteObjectsAbove(self, num_partitions, partition, oid, serial):
+        obj = self._obj
+        u64 = util.u64
+        oid = u64(oid)
+        serial = u64(serial)
+        if oid % num_partitions == partition:
+            try:
+                tserial = obj[oid]
+            except KeyError:
+                pass
+            else:
+                batchDelete(tserial, lambda _, __: True,
+                    iter_kw={'min': serial, 'excludemin': True})
+        def same_partition(key, _):
+            return key % num_partitions == partition
+        batchDelete(obj, same_partition,
+            iter_kw={'min': oid, 'excludemin': True}, recycle_subtrees=True)
+
     def getTransaction(self, tid, all=False):
         tid = util.u64(tid)
         try:

Modified: trunk/neo/storage/database/manager.py
==============================================================================
--- trunk/neo/storage/database/manager.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/manager.py [iso-8859-1] Wed Nov 17 14:05:28 2010
@@ -388,11 +388,21 @@ class DatabaseManager(object):
         an oid list"""
         raise NotImplementedError
 
+    def deleteTransactionsAbove(self, num_partitions, partition, tid):
+        """Delete all transactions above given TID (excluded) in given
+        partition."""
+        raise NotImplementedError
+
     def deleteObject(self, oid, serial=None):
         """Delete given object. If serial is given, only delete that serial for
         given oid."""
         raise NotImplementedError
 
+    def deleteObjectsAbove(self, num_partitions, partition, oid, serial):
+        """Delete all objects above given OID and serial (excluded) in given
+        partition."""
+        raise NotImplementedError
+
     def getTransaction(self, tid, all = False):
         """Return a tuple of the list of OIDs, user information,
         a description, and extension information, for a given transaction

Modified: trunk/neo/storage/database/mysqldb.py
==============================================================================
--- trunk/neo/storage/database/mysqldb.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/mysqldb.py [iso-8859-1] Wed Nov 17 14:05:28 2010
@@ -557,6 +557,19 @@ class MySQLDatabaseManager(DatabaseManag
             raise
         self.commit()
 
+    def deleteTransactionsAbove(self, num_partitions, partition, tid):
+        self.begin()
+        try:
+            self.query('DELETE FROM trans WHERE partition=%(partition)d AND '
+              'tid > %(tid)d' % {
+                'partition': partition,
+                'tid': util.u64(tid),
+            })
+        except:
+            self.rollback()
+            raise
+        self.commit()
+
     def deleteObject(self, oid, serial=None):
         u64 = util.u64
         oid = u64(oid)
@@ -577,6 +590,21 @@ class MySQLDatabaseManager(DatabaseManag
             raise
         self.commit()
 
+    def deleteObjectsAbove(self, num_partitions, partition, oid, serial):
+        u64 = util.u64
+        self.begin()
+        try:
+            self.query('DELETE FROM obj WHERE partition=%(partition)d AND '
+              'oid > %(oid)d OR (oid = %(oid)d AND serial > %(serial)d)' % {
+                'partition': partition,
+                'oid': u64(oid),
+                'serial': u64(serial),
+            })
+        except:
+            self.rollback()
+            raise
+        self.commit()
+
     def getTransaction(self, tid, all = False):
         q = self.query
         tid = util.u64(tid)

Modified: trunk/neo/storage/handlers/replication.py
==============================================================================
--- trunk/neo/storage/handlers/replication.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/replication.py [iso-8859-1] Wed Nov 17 14:05:28 2010
@@ -242,7 +242,8 @@ class ReplicationHandler(EventHandler):
     def answerCheckTIDRange(self, conn, min_tid, length, count, tid_checksum,
             max_tid):
         ask = conn.ask
-        replicator = self.app.replicator
+        app = self.app
+        replicator = app.replicator
         next_tid = add64(max_tid, 1)
         action, params = self._checkRange(
             replicator.getTIDCheckResult(min_tid, length) == (
@@ -261,6 +262,10 @@ class ReplicationHandler(EventHandler):
             else:
                 ask(self._doAskCheckTIDRange(min_tid, count))
         if action == CHECK_DONE:
+            # Delete all transactions we might have which are beyond what peer
+            # knows.
+            app.dm.deleteTransactionsAbove(app.pt.getPartitions(),
+              replicator.getCurrentRID(), max_tid)
             # If no more TID, a replication of transactions is finished.
             # So start to replicate objects now.
             ask(self._doAskCheckSerialRange(ZERO_OID, ZERO_TID))
@@ -269,7 +274,8 @@ class ReplicationHandler(EventHandler):
     def answerCheckSerialRange(self, conn, min_oid, min_serial, length, count,
             oid_checksum, max_oid, serial_checksum, max_serial):
         ask = conn.ask
-        replicator = self.app.replicator
+        app = self.app
+        replicator = app.replicator
         next_params = (max_oid, add64(max_serial, 1))
         action, params = self._checkRange(
             replicator.getSerialCheckResult(min_oid, min_serial, length) == (
@@ -284,6 +290,10 @@ class ReplicationHandler(EventHandler):
             ((min_oid, min_serial), count) = params
             ask(self._doAskCheckSerialRange(min_oid, min_serial, count))
         if action == CHECK_DONE:
+            # Delete all objects we might have which are beyond what peer
+            # knows.
+            app.dm.deleteObjectsAbove(app.pt.getPartitions(),
+              replicator.getCurrentRID(), max_oid, max_serial)
             # Nothing remains, so the replication for this partition is
             # finished.
             replicator.setReplicationDone()

Modified: trunk/neo/tests/storage/testReplicationHandler.py
==============================================================================
--- trunk/neo/tests/storage/testReplicationHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testReplicationHandler.py [iso-8859-1] Wed Nov 17 14:05:28 2010
@@ -67,10 +67,12 @@ class StorageReplicationHandlerTests(Neo
         pass
 
     def getApp(self, conn=None, tid_check_result=(0, 0, ZERO_TID),
-            serial_check_result=(0, 0, ZERO_OID, 0, ZERO_TID),
-            tid_result=(),
-            history_result=None,
-            rid=0, critical_tid=ZERO_TID):
+                serial_check_result=(0, 0, ZERO_OID, 0, ZERO_TID),
+                tid_result=(),
+                history_result=None,
+                rid=0, critical_tid=ZERO_TID,
+                num_partitions=1,
+            ):
         if history_result is None:
             history_result = {}
         replicator = Mock({
@@ -99,6 +101,9 @@ class StorageReplicationHandlerTests(Neo
                 'storeTransaction': None,
                 'deleteObject': None,
             })
+            pt = Mock({
+                'getPartitions': num_partitions,
+            })
         return FakeApp
 
     def _checkReplicationStarted(self, conn, rid, replicator):
@@ -360,9 +365,10 @@ class StorageReplicationHandlerTests(Neo
         max_tid = self.getNextTID()
         length = RANGE_LENGTH / 2
         rid = 12
+        num_partitions = 13
         conn = self.getFakeConnection()
         app = self.getApp(tid_check_result=(length - 1, 0, max_tid), rid=rid,
-            conn=conn)
+            conn=conn, num_partitions=num_partitions)
         handler = ReplicationHandler(app)
         # Peer has the same data as we have: length, checksum and max_tid
         # match.
@@ -378,6 +384,10 @@ class StorageReplicationHandlerTests(Neo
         calls = app.replicator.mockGetNamedCalls('checkSerialRange')
         self.assertEqual(len(calls), 1)
         calls[0].checkArgs(pmin_oid, pmin_serial, plength, ppartition)
+        # ...and delete partition tail
+        calls = app.dm.mockGetNamedCalls('deleteTransactionsAbove')
+        self.assertEqual(len(calls), 1)
+        calls[0].checkArgs(num_partitions, rid, max_tid)
 
     def test_answerCheckTIDRangeDifferentBigChunk(self):
         min_tid = self.getNextTID()
@@ -514,9 +524,10 @@ class StorageReplicationHandlerTests(Neo
         max_serial = self.getNextTID()
         length = RANGE_LENGTH / 2
         rid = 12
+        num_partitions = 13
         conn = self.getFakeConnection()
         app = self.getApp(serial_check_result=(length - 1, 0, max_oid, 1,
-            max_serial), rid=rid, conn=conn)
+            max_serial), rid=rid, conn=conn, num_partitions=num_partitions)
         handler = ReplicationHandler(app)
         # Peer has the same data as we have
         handler.answerCheckSerialRange(conn, min_oid, min_serial, length,
@@ -524,6 +535,10 @@ class StorageReplicationHandlerTests(Neo
         # Result: mark replication as done
         self.checkNoPacketSent(conn)
         self.assertTrue(app.replicator.replication_done)
+        # ...and delete partition tail
+        calls = app.dm.mockGetNamedCalls('deleteObjectsAbove')
+        self.assertEqual(len(calls), 1)
+        calls[0].checkArgs(num_partitions, rid, max_oid, max_serial)
 
     def test_answerCheckSerialRangeDifferentBigChunk(self):
         min_oid = self.getOID(1)
@@ -590,9 +605,12 @@ class StorageReplicationHandlerTests(Neo
         critical_tid = self.getNextTID()
         length = MIN_RANGE_LENGTH - 1
         rid = 12
+        num_partitions = 13
         conn = self.getFakeConnection()
         app = self.getApp(tid_check_result=(length - 5, 0, max_oid,
-            1, max_serial), rid=rid, conn=conn, critical_tid=critical_tid)
+            1, max_serial), rid=rid, conn=conn, critical_tid=critical_tid,
+            num_partitions=num_partitions,
+        )
         handler = ReplicationHandler(app)
         # Peer has different data, and less than length
         handler.answerCheckSerialRange(conn, min_oid, min_serial, length,
@@ -611,6 +629,10 @@ class StorageReplicationHandlerTests(Neo
         calls[0].checkArgs(pmin_oid, pmin_serial, pmax_serial, plength,
             ppartition)
         self.assertTrue(app.replicator.replication_done)
+        # ...and delete partition tail
+        calls = app.dm.mockGetNamedCalls('deleteObjectsAbove')
+        self.assertEqual(len(calls), 1)
+        calls[0].checkArgs(num_partitions, rid, max_oid, max_serial)
 
 if __name__ == "__main__":
     unittest.main()

Modified: trunk/neo/tests/storage/testStorageDBTests.py
==============================================================================
--- trunk/neo/tests/storage/testStorageDBTests.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testStorageDBTests.py [iso-8859-1] Wed Nov 17 14:05:28 2010
@@ -334,6 +334,24 @@ class StorageDBTests(NeoUnitTestBase):
         self.assertEqual(self.db.getTransaction(tid1, True), None)
         self.assertEqual(self.db.getTransaction(tid2, True), None)
 
+    def test_deleteTransactionsAbove(self):
+        self.db.setNumPartitions(2)
+        tid1 = self.getOID(0)
+        tid2 = self.getOID(1)
+        tid3 = self.getOID(2)
+        oid1 = self.getOID(1)
+        for tid in (tid1, tid2, tid3):
+            txn, objs = self.getTransaction([oid1])
+            self.db.storeTransaction(tid, objs, txn)
+            self.db.finishTransaction(tid)
+        self.db.deleteTransactionsAbove(2, 0, tid1)
+        # Right partition, below cutoff
+        self.assertNotEqual(self.db.getTransaction(tid1, True), None)
+        # Wrong partition, above cutoff
+        self.assertNotEqual(self.db.getTransaction(tid2, True), None)
+        # Right partition, above cutoff
+        self.assertEqual(self.db.getTransaction(tid3, True), None)
+
     def test_deleteObject(self):
         oid1, oid2 = self.getOIDs(2)
         tid1, tid2 = self.getTIDs(2)
@@ -351,6 +369,31 @@ class StorageDBTests(NeoUnitTestBase):
         self.assertEqual(self.db.getObject(oid2, tid=tid2), (tid2, None) + \
             objs2[1][1:])
 
+    def test_deleteObjectsAbove(self):
+        self.db.setNumPartitions(2)
+        tid1 = self.getOID(1)
+        tid2 = self.getOID(2)
+        tid3 = self.getOID(3)
+        oid1 = self.getOID(0)
+        oid2 = self.getOID(1)
+        oid3 = self.getOID(2)
+        for tid in (tid1, tid2, tid3):
+            txn, objs = self.getTransaction([oid1, oid2, oid3])
+            self.db.storeTransaction(tid, objs, txn)
+            self.db.finishTransaction(tid)
+        self.db.deleteObjectsAbove(2, 0, oid1, tid1)
+        # Right partition, below cutoff
+        self.assertNotEqual(self.db.getObject(oid1, tid=tid1), None)
+        # Right partition, above tid cutoff
+        self.assertEqual(self.db.getObject(oid1, tid=tid2), False)
+        self.assertEqual(self.db.getObject(oid1, tid=tid3), False)
+        # Wrong partition, above cutoff
+        self.assertNotEqual(self.db.getObject(oid2, tid=tid1), None)
+        self.assertNotEqual(self.db.getObject(oid2, tid=tid2), None)
+        self.assertNotEqual(self.db.getObject(oid2, tid=tid3), None)
+        # Right partition, above cutoff
+        self.assertEqual(self.db.getObject(oid3), None)
+
     def test_getTransaction(self):
         oid1, oid2 = self.getOIDs(2)
         tid1, tid2 = self.getTIDs(2)





More information about the Neo-report mailing list