[Neo-report] r2476 vincent - in /trunk/neo: storage/database/ storage/handlers/ tests/stor...

nobody at svn.erp5.org nobody at svn.erp5.org
Tue Dec 7 22:25:16 CET 2010


Author: vincent
Date: Tue Dec  7 22:25:15 2010
New Revision: 2476

Log:
Fix replication when last TID range is empty. TO TEST

Modified:
    trunk/neo/storage/database/btree.py
    trunk/neo/storage/database/manager.py
    trunk/neo/storage/database/mysqldb.py
    trunk/neo/storage/handlers/replication.py
    trunk/neo/tests/storage/testReplicationHandler.py
    trunk/neo/tests/storage/testStorageDBTests.py

Modified: trunk/neo/storage/database/btree.py
==============================================================================
--- trunk/neo/storage/database/btree.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/btree.py [iso-8859-1] Tue Dec  7 22:25:15 2010
@@ -440,7 +440,7 @@ class BTreeDatabaseManager(DatabaseManag
         def same_partition(key, _):
             return key % num_partitions == partition
         batchDelete(self._trans, same_partition,
-            iter_kw={'min': tid, 'excludemin': True})
+            iter_kw={'min': tid})
 
     def deleteObject(self, oid, serial=None):
         u64 = util.u64
@@ -477,7 +477,7 @@ class BTreeDatabaseManager(DatabaseManag
                 pass
             else:
                 batchDelete(tserial, lambda _, __: True,
-                    iter_kw={'min': serial, 'excludemin': True})
+                    iter_kw={'min': serial})
         def same_partition(key, _):
             return key % num_partitions == partition
         batchDelete(obj, same_partition,

Modified: trunk/neo/storage/database/manager.py
==============================================================================
--- trunk/neo/storage/database/manager.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/manager.py [iso-8859-1] Tue Dec  7 22:25:15 2010
@@ -389,7 +389,7 @@ class DatabaseManager(object):
         raise NotImplementedError
 
     def deleteTransactionsAbove(self, num_partitions, partition, tid):
-        """Delete all transactions above given TID (excluded) in given
+        """Delete all transactions above given TID (inclued) in given
         partition."""
         raise NotImplementedError
 
@@ -399,7 +399,7 @@ class DatabaseManager(object):
         raise NotImplementedError
 
     def deleteObjectsAbove(self, num_partitions, partition, oid, serial):
-        """Delete all objects above given OID and serial (excluded) in given
+        """Delete all objects above given OID and serial (inclued) in given
         partition."""
         raise NotImplementedError
 

Modified: trunk/neo/storage/database/mysqldb.py
==============================================================================
--- trunk/neo/storage/database/mysqldb.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/mysqldb.py [iso-8859-1] Tue Dec  7 22:25:15 2010
@@ -561,7 +561,7 @@ class MySQLDatabaseManager(DatabaseManag
         self.begin()
         try:
             self.query('DELETE FROM trans WHERE partition=%(partition)d AND '
-              'tid > %(tid)d' % {
+              'tid >= %(tid)d' % {
                 'partition': partition,
                 'tid': util.u64(tid),
             })
@@ -595,7 +595,7 @@ class MySQLDatabaseManager(DatabaseManag
         self.begin()
         try:
             self.query('DELETE FROM obj WHERE partition=%(partition)d AND ('
-              'oid > %(oid)d OR (oid = %(oid)d AND serial > %(serial)d))' % {
+              'oid > %(oid)d OR (oid = %(oid)d AND serial >= %(serial)d))' % {
                 'partition': partition,
                 'oid': u64(oid),
                 'serial': u64(serial),

Modified: trunk/neo/storage/handlers/replication.py
==============================================================================
--- trunk/neo/storage/handlers/replication.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/replication.py [iso-8859-1] Tue Dec  7 22:25:15 2010
@@ -100,6 +100,7 @@ class ReplicationHandler(EventHandler):
 
     @checkConnectionIsReplicatorConnection
     def answerTIDsFrom(self, conn, tid_list):
+        assert tid_list
         app = self.app
         ask = conn.ask
         # If I have pending TIDs, check which TIDs I don't have, and
@@ -111,13 +112,10 @@ class ReplicationHandler(EventHandler):
             deleteTransaction = app.dm.deleteTransaction
             for tid in extra_tid_set:
                 deleteTransaction(tid)
-        if tid_list:
-            missing_tid_set = tid_set - my_tid_set
-            for tid in missing_tid_set:
-                ask(Packets.AskTransactionInformation(tid), timeout=300)
-            ask(self._doAskCheckTIDRange(add64(tid_list[-1], 1), RANGE_LENGTH))
-        else:
-            ask(self._doAskCheckSerialRange(ZERO_OID, ZERO_TID))
+        missing_tid_set = tid_set - my_tid_set
+        for tid in missing_tid_set:
+            ask(Packets.AskTransactionInformation(tid), timeout=300)
+        ask(self._doAskCheckTIDRange(add64(tid_list[-1], 1), RANGE_LENGTH))
 
     @checkConnectionIsReplicatorConnection
     def answerTransactionInformation(self, conn, tid,
@@ -129,19 +127,17 @@ class ReplicationHandler(EventHandler):
 
     @checkConnectionIsReplicatorConnection
     def answerObjectHistoryFrom(self, conn, object_dict):
+        assert object_dict
         app = self.app
         ask = conn.ask
         deleteObject = app.dm.deleteObject
         my_object_dict = app.replicator.getObjectHistoryFromResult()
         object_set = set()
-        if object_dict:
-            max_oid = max(object_dict.iterkeys())
-            max_serial = max(object_dict[max_oid])
-            for oid, serial_list in object_dict.iteritems():
-                for serial in serial_list:
-                    object_set.add((oid, serial))
-        else:
-            max_oid = None
+        max_oid = max(object_dict.iterkeys())
+        max_serial = max(object_dict[max_oid])
+        for oid, serial_list in object_dict.iteritems():
+            for serial in serial_list:
+                object_set.add((oid, serial))
         my_object_set = set()
         for oid, serial_list in my_object_dict.iteritems():
             filter = lambda x: True
@@ -156,14 +152,11 @@ class ReplicationHandler(EventHandler):
         extra_object_set = my_object_set - object_set
         for oid, serial in extra_object_set:
             deleteObject(oid, serial)
-        if object_dict:
-            missing_object_set = object_set - my_object_set
-            for oid, serial in missing_object_set:
-                ask(Packets.AskObject(oid, serial, None), timeout=300)
-            ask(self._doAskCheckSerialRange(max_oid, add64(max_serial, 1),
-                RANGE_LENGTH))
-        else:
-            self.app.replicator.setReplicationDone()
+        missing_object_set = object_set - my_object_set
+        for oid, serial in missing_object_set:
+            ask(Packets.AskObject(oid, serial, None), timeout=300)
+        ask(self._doAskCheckSerialRange(max_oid, add64(max_serial, 1),
+            RANGE_LENGTH))
 
     @checkConnectionIsReplicatorConnection
     def answerObject(self, conn, oid, serial_start,
@@ -228,10 +221,15 @@ class ReplicationHandler(EventHandler):
             else:
                 # No more chunks.
                 action = CHECK_DONE
-                params = None
+                params = (next_boundary, )
         else:
             # We must recheck current chunk.
-            if length <= MIN_RANGE_LENGTH:
+            if count == 0:
+                # Reference storage has no data for this chunk, stop and
+                # truncate.
+                action = CHECK_DONE
+                params = (current_boundary, )
+            elif length <= MIN_RANGE_LENGTH:
                 # We are already at minimum chunk length, replicate.
                 action = CHECK_REPLICATE
                 params = (recheck_min_boundary, )
@@ -259,18 +257,21 @@ class ReplicationHandler(EventHandler):
             ask(self._doAskTIDsFrom(min_tid, count))
             if length != count:
                 action = CHECK_DONE
+                params = (next_tid, )
         if action == CHECK_CHUNK:
             (min_tid, count) = params
             if min_tid >= replicator.getCurrentCriticalTID():
                 # Stop if past critical TID
                 action = CHECK_DONE
+                params = (next_tid, )
             else:
                 ask(self._doAskCheckTIDRange(min_tid, count))
         if action == CHECK_DONE:
             # Delete all transactions we might have which are beyond what peer
             # knows.
+            (last_tid, ) = params
             app.dm.deleteTransactionsAbove(app.pt.getPartitions(),
-              replicator.getCurrentRID(), max_tid)
+                replicator.getCurrentRID(), last_tid)
             # If no more TID, a replication of transactions is finished.
             # So start to replicate objects now.
             ask(self._doAskCheckSerialRange(ZERO_OID, ZERO_TID))
@@ -291,14 +292,16 @@ class ReplicationHandler(EventHandler):
             ask(self._doAskObjectHistoryFrom(min_oid, min_serial, count))
             if length != count:
                 action = CHECK_DONE
+                params = (next_params, )
         if action == CHECK_CHUNK:
             ((min_oid, min_serial), count) = params
             ask(self._doAskCheckSerialRange(min_oid, min_serial, count))
         if action == CHECK_DONE:
             # Delete all objects we might have which are beyond what peer
             # knows.
+            ((last_oid, last_serial), ) = params
             app.dm.deleteObjectsAbove(app.pt.getPartitions(),
-              replicator.getCurrentRID(), max_oid, max_serial)
+              replicator.getCurrentRID(), last_oid, last_serial)
             # Nothing remains, so the replication for this partition is
             # finished.
             replicator.setReplicationDone()

Modified: trunk/neo/tests/storage/testReplicationHandler.py
==============================================================================
--- trunk/neo/tests/storage/testReplicationHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testReplicationHandler.py [iso-8859-1] Tue Dec  7 22:25:15 2010
@@ -207,13 +207,6 @@ class StorageReplicationHandlerTests(Neo
         calls = app.dm.mockGetNamedCalls('deleteTransaction')
         self.assertEqual(len(calls), 1)
         calls[0].checkArgs(tid_list[0])
-        # Peer has no transaction above requested min, go on with object
-        # replication after deleting local transactions
-        conn = self.getFakeConnection()
-        known_tid_list = [tid_list[0], ]
-        app = self.getApp(conn=conn, tid_result=known_tid_list)
-        ReplicationHandler(app).answerTIDsFrom(conn, [])
-        self.checkAskPacket(conn, Packets.AskCheckSerialRange)
 
     def test_answerTransactionInformation(self):
         conn = self.getFakeConnection()
@@ -276,22 +269,6 @@ class StorageReplicationHandlerTests(Neo
             (oid_4, tid_list[4]),
         ))
         self.assertEqual(actual_deletes, expected_deletes)
-        # Peer has no object above requested min, replication is over for this
-        # transaction once we deleted local content.
-        oid_dict = FakeDict(())
-        conn = self.getFakeConnection()
-        app = self.getApp(conn=conn, history_result={
-            oid_1: [tid_list[2]],
-        })
-        ReplicationHandler(app).answerObjectHistoryFrom(conn, oid_dict)
-        calls = app.dm.mockGetNamedCalls('deleteObject')
-        actual_deletes = set(((x.getParam(0), x.getParam(1)) for x in calls))
-        expected_deletes = set((
-            (oid_1, tid_list[2]),
-        ))
-        self.assertEqual(actual_deletes, expected_deletes)
-        calls = app.replicator.mockGetNamedCalls('setReplicationDone')
-        self.assertEqual(len(calls), 1)
 
     def test_answerObject(self):
         conn = self.getFakeConnection()
@@ -411,7 +388,7 @@ class StorageReplicationHandlerTests(Neo
         # ...and delete partition tail
         calls = app.dm.mockGetNamedCalls('deleteTransactionsAbove')
         self.assertEqual(len(calls), 1)
-        calls[0].checkArgs(num_partitions, rid, max_tid)
+        calls[0].checkArgs(num_partitions, rid, add64(max_tid, 1))
 
     def test_answerCheckTIDRangeDifferentBigChunk(self):
         min_tid = self.getNextTID()
@@ -562,7 +539,7 @@ class StorageReplicationHandlerTests(Neo
         # ...and delete partition tail
         calls = app.dm.mockGetNamedCalls('deleteObjectsAbove')
         self.assertEqual(len(calls), 1)
-        calls[0].checkArgs(num_partitions, rid, max_oid, max_serial)
+        calls[0].checkArgs(num_partitions, rid, max_oid, add64(max_serial, 1))
 
     def test_answerCheckSerialRangeDifferentBigChunk(self):
         min_oid = self.getOID(1)
@@ -656,7 +633,7 @@ class StorageReplicationHandlerTests(Neo
         # ...and delete partition tail
         calls = app.dm.mockGetNamedCalls('deleteObjectsAbove')
         self.assertEqual(len(calls), 1)
-        calls[0].checkArgs(num_partitions, rid, max_oid, max_serial)
+        calls[0].checkArgs(num_partitions, rid, max_oid, add64(max_serial, 1))
 
 if __name__ == "__main__":
     unittest.main()

Modified: trunk/neo/tests/storage/testStorageDBTests.py
==============================================================================
--- trunk/neo/tests/storage/testStorageDBTests.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testStorageDBTests.py [iso-8859-1] Tue Dec  7 22:25:15 2010
@@ -344,7 +344,7 @@ class StorageDBTests(NeoUnitTestBase):
             txn, objs = self.getTransaction([oid1])
             self.db.storeTransaction(tid, objs, txn)
             self.db.finishTransaction(tid)
-        self.db.deleteTransactionsAbove(2, 0, tid1)
+        self.db.deleteTransactionsAbove(2, 0, tid2)
         # Right partition, below cutoff
         self.assertNotEqual(self.db.getTransaction(tid1, True), None)
         # Wrong partition, above cutoff
@@ -381,7 +381,7 @@ class StorageDBTests(NeoUnitTestBase):
             txn, objs = self.getTransaction([oid1, oid2, oid3])
             self.db.storeTransaction(tid, objs, txn)
             self.db.finishTransaction(tid)
-        self.db.deleteObjectsAbove(2, 0, oid1, tid1)
+        self.db.deleteObjectsAbove(2, 0, oid1, tid2)
         # Right partition, below cutoff
         self.assertNotEqual(self.db.getObject(oid1, tid=tid1), None)
         # Right partition, above tid cutoff




More information about the Neo-report mailing list