[Neo-report] r2405 vincent - in /trunk/neo: storage/handlers/ tests/storage/

nobody at svn.erp5.org nobody at svn.erp5.org
Tue Nov 2 16:21:57 CET 2010


Author: vincent
Date: Tue Nov  2 16:21:56 2010
New Revision: 2405

Log:
Implement true binary search in replication.

If current chunk size is lesser than maximum, it means latest check for the
whole chunk found a problem. If current smaller check succeeded, move to
the other half and divide size by two.
Also, factorise replication "check...Range" handlers.

Modified:
    trunk/neo/storage/handlers/replication.py
    trunk/neo/tests/storage/testReplicationHandler.py

Modified: trunk/neo/storage/handlers/replication.py
==============================================================================
--- trunk/neo/storage/handlers/replication.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/replication.py [iso-8859-1] Tue Nov  2 16:21:56 2010
@@ -26,6 +26,10 @@ from neo import util
 RANGE_LENGTH = 4000
 MIN_RANGE_LENGTH = 1000
 
+CHECK_CHUNK = 0
+CHECK_REPLICATE = 1
+CHECK_DONE = 2
+
 """
 Replication algorythm
 
@@ -179,66 +183,99 @@ class ReplicationHandler(EventHandler):
         return Packets.AskObjectHistoryFrom(min_oid, min_serial, max_serial,
             length, partition_id)
 
-    @checkConnectionIsReplicatorConnection
-    def answerCheckTIDRange(self, conn, min_tid, length, count, tid_checksum,
-            max_tid):
-        replicator = self.app.replicator
-        our = replicator.getTIDCheckResult(min_tid, length)
-        his = (count, tid_checksum, max_tid)
-        p = None
-        if our != his:
-            # Something is different...
+    def _checkRange(self, match, current_boundary, next_boundary, length,
+            count):
+        if match:
+            # Same data on both sides
+            if length < RANGE_LENGTH and length == count:
+                # ...and previous check detected a difference - and we still
+                # haven't reached the end. This means that we just check the
+                # first half of a chunk which, as a whole, is different. So
+                # next test must happen on the next chunk.
+                recheck_min_boundary = next_boundary
+            else:
+                # ...and we just checked a whole chunk, move on to the next
+                # one.
+                recheck_min_boundary = None
+        else:
+            # Something is different in current chunk
+            recheck_min_boundary = current_boundary
+        if recheck_min_boundary is None:
+            if count == length:
+                # Go on with next chunk
+                action = CHECK_CHUNK
+                params = (next_boundary, RANGE_LENGTH)
+            else:
+                # No more chunks.
+                action = CHECK_DONE
+                params = None
+        else:
+            # We must recheck current chunk.
             if length <= MIN_RANGE_LENGTH:
                 # We are already at minimum chunk length, replicate.
-                conn.ask(self._doAskTIDsFrom(min_tid, count))
+                action = CHECK_REPLICATE
+                params = (recheck_min_boundary, )
             else:
                 # Check a smaller chunk.
-                # Note: this could be made into a real binary search, but is
-                # it really worth the work ?
                 # Note: +1, so we can detect we reached the end when answer
                 # comes back.
-                p = self._doAskCheckTIDRange(min_tid, min(length / 2,
-                    count + 1))
-        if p is None:
-            if count == length and \
-                    max_tid < replicator.getCurrentCriticalTID():
-                # Go on with next chunk
-                p = self._doAskCheckTIDRange(add64(max_tid, 1))
+                action = CHECK_CHUNK
+                params = (recheck_min_boundary, min(length / 2, count + 1))
+        return action, params
+
+    @checkConnectionIsReplicatorConnection
+    def answerCheckTIDRange(self, conn, min_tid, length, count, tid_checksum,
+            max_tid):
+        ask = conn.ask
+        replicator = self.app.replicator
+        next_tid = add64(max_tid, 1)
+        action, params = self._checkRange(
+            replicator.getTIDCheckResult(min_tid, length) == (
+            count, tid_checksum, max_tid), min_tid, next_tid, length,
+            count)
+        if action == CHECK_REPLICATE:
+            (min_tid, ) = params
+            ask(self._doAskTIDsFrom(min_tid, count))
+            if length == count:
+                action = CHECK_CHUNK
+                params = (next_tid, RANGE_LENGTH)
             else:
-                # If no more TID, a replication of transactions is finished.
-                # So start to replicate objects now.
-                p = self._doAskCheckSerialRange(ZERO_OID, ZERO_TID)
-        conn.ask(p)
+                action = CHECK_DONE
+        if action == CHECK_CHUNK:
+            (min_tid, count) = params
+            if min_tid >= replicator.getCurrentCriticalTID():
+                # Stop if past critical TID
+                action = CHECK_DONE
+            else:
+                ask(self._doAskCheckTIDRange(min_tid, count))
+        if action == CHECK_DONE:
+            # If no more TID, a replication of transactions is finished.
+            # So start to replicate objects now.
+            ask(self._doAskCheckSerialRange(ZERO_OID, ZERO_TID))
 
     @checkConnectionIsReplicatorConnection
     def answerCheckSerialRange(self, conn, min_oid, min_serial, length, count,
             oid_checksum, max_oid, serial_checksum, max_serial):
+        ask = conn.ask
         replicator = self.app.replicator
-        our = replicator.getSerialCheckResult(min_oid, min_serial, length)
-        his = (count, oid_checksum, max_oid, serial_checksum, max_serial)
-        p = None
-        if our != his:
-            # Something is different...
-            if length <= MIN_RANGE_LENGTH:
-                # We are already at minimum chunk length, replicate.
-                conn.ask(self._doAskObjectHistoryFrom(min_oid, min_serial,
-                  count))
-            else:
-                # Check a smaller chunk.
-                # Note: this could be made into a real binary search, but is
-                # it really worth the work ?
-                # Note: +1, so we can detect we reached the end when answer
-                # comes back.
-                p = self._doAskCheckSerialRange(min_oid, min_serial,
-                    min(length / 2, count + 1))
-        if p is None:
-            if count == length:
-                # Go on with next chunk
-                p = self._doAskCheckSerialRange(max_oid, add64(max_serial, 1))
+        next_params = (max_oid, add64(max_serial, 1))
+        action, params = self._checkRange(
+            replicator.getSerialCheckResult(min_oid, min_serial, length) == (
+            count, oid_checksum, max_oid, serial_checksum, max_serial),
+            (min_oid, min_serial), next_params, length, count)
+        if action == CHECK_REPLICATE:
+            ((min_oid, min_serial), ) = params
+            ask(self._doAskObjectHistoryFrom(min_oid, min_serial, count))
+            if length == count:
+                action = CHECK_CHUNK
+                params = (next_params, RANGE_LENGTH)
             else:
-                # Nothing remains, so the replication for this partition is
-                # finished.
-                replicator.setReplicationDone()
-        if p is not None:
-            conn.ask(p)
+                action = CHECK_DONE
+        if action == CHECK_CHUNK:
+            ((min_oid, min_serial), count) = params
+            ask(self._doAskCheckSerialRange(min_oid, min_serial, count))
+        if action == CHECK_DONE:
+            # Nothing remains, so the replication for this partition is
+            # finished.
+            replicator.setReplicationDone()
 

Modified: trunk/neo/tests/storage/testReplicationHandler.py
==============================================================================
--- trunk/neo/tests/storage/testReplicationHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testReplicationHandler.py [iso-8859-1] Tue Nov  2 16:21:56 2010
@@ -237,12 +237,12 @@ class StorageReplicationHandlerTests(Neo
             data_serial)], None, False)
 
     # CheckTIDRange
-    def test_answerCheckTIDRangeIdenticalChunkWithNext(self):
+    def test_answerCheckTIDFullRangeIdenticalChunkWithNext(self):
         min_tid = self.getNextTID()
         max_tid = self.getNextTID()
         critical_tid = self.getNextTID()
         assert max_tid < critical_tid
-        length = RANGE_LENGTH / 2
+        length = RANGE_LENGTH
         rid = 12
         conn = self.getFakeConnection()
         app = self.getApp(tid_check_result=(length, 0, max_tid), rid=rid,
@@ -261,6 +261,30 @@ class StorageReplicationHandlerTests(Neo
         self.assertEqual(len(calls), 1)
         calls[0].checkArgs(pmin_tid, plength, ppartition)
 
+    def test_answerCheckTIDSmallRangeIdenticalChunkWithNext(self):
+        min_tid = self.getNextTID()
+        max_tid = self.getNextTID()
+        critical_tid = self.getNextTID()
+        assert max_tid < critical_tid
+        length = RANGE_LENGTH / 2
+        rid = 12
+        conn = self.getFakeConnection()
+        app = self.getApp(tid_check_result=(length, 0, max_tid), rid=rid,
+            conn=conn, critical_tid=critical_tid)
+        handler = ReplicationHandler(app)
+        # Peer has the same data as we have: length, checksum and max_tid
+        # match.
+        handler.answerCheckTIDRange(conn, min_tid, length, length, 0, max_tid)
+        # Result: go on with next chunk
+        pmin_tid, plength, ppartition = self.checkAskPacket(conn,
+            Packets.AskCheckTIDRange, decode=True)
+        self.assertEqual(pmin_tid, add64(max_tid, 1))
+        self.assertEqual(plength, length / 2)
+        self.assertEqual(ppartition, rid)
+        calls = app.replicator.mockGetNamedCalls('checkTIDRange')
+        self.assertEqual(len(calls), 1)
+        calls[0].checkArgs(pmin_tid, plength, ppartition)
+
     def test_answerCheckTIDRangeIdenticalChunkAboveCriticalTID(self):
         critical_tid = self.getNextTID()
         min_tid = self.getNextTID()
@@ -409,12 +433,12 @@ class StorageReplicationHandlerTests(Neo
         calls[0].checkArgs(pmin_oid, pmin_serial, plength, ppartition)
 
     # CheckSerialRange
-    def test_answerCheckSerialRangeIdenticalChunkWithNext(self):
+    def test_answerCheckSerialFullRangeIdenticalChunkWithNext(self):
         min_oid = self.getOID(1)
         max_oid = self.getOID(10)
         min_serial = self.getNextTID()
         max_serial = self.getNextTID()
-        length = RANGE_LENGTH / 2
+        length = RANGE_LENGTH
         rid = 12
         conn = self.getFakeConnection()
         app = self.getApp(serial_check_result=(length, 0, max_oid, 1,
@@ -434,6 +458,31 @@ class StorageReplicationHandlerTests(Neo
         self.assertEqual(len(calls), 1)
         calls[0].checkArgs(pmin_oid, pmin_serial, plength, ppartition)
 
+    def test_answerCheckSerialSmallRangeIdenticalChunkWithNext(self):
+        min_oid = self.getOID(1)
+        max_oid = self.getOID(10)
+        min_serial = self.getNextTID()
+        max_serial = self.getNextTID()
+        length = RANGE_LENGTH / 2
+        rid = 12
+        conn = self.getFakeConnection()
+        app = self.getApp(serial_check_result=(length, 0, max_oid, 1,
+            max_serial), rid=rid, conn=conn)
+        handler = ReplicationHandler(app)
+        # Peer has the same data as we have
+        handler.answerCheckSerialRange(conn, min_oid, min_serial, length,
+            length, 0, max_oid, 1, max_serial)
+        # Result: go on with next chunk
+        pmin_oid, pmin_serial, plength, ppartition = self.checkAskPacket(conn,
+            Packets.AskCheckSerialRange, decode=True)
+        self.assertEqual(pmin_oid, max_oid)
+        self.assertEqual(pmin_serial, add64(max_serial, 1))
+        self.assertEqual(plength, length / 2)
+        self.assertEqual(ppartition, rid)
+        calls = app.replicator.mockGetNamedCalls('checkSerialRange')
+        self.assertEqual(len(calls), 1)
+        calls[0].checkArgs(pmin_oid, pmin_serial, plength, ppartition)
+
     def test_answerCheckSerialRangeIdenticalChunkWithoutNext(self):
         min_oid = self.getOID(1)
         max_oid = self.getOID(10)





More information about the Neo-report mailing list