[Neo-report] r2405 vincent - in /trunk/neo: storage/handlers/ tests/storage/
nobody at svn.erp5.org
nobody at svn.erp5.org
Tue Nov 2 16:21:57 CET 2010
Author: vincent
Date: Tue Nov 2 16:21:56 2010
New Revision: 2405
Log:
Implement true binary search in replication.
If current chunk size is lesser than maximum, it means latest check for the
whole chunk found a problem. If current smaller check succeeded, move to
the other half and divide size by two.
Also, factorise replication "check...Range" handlers.
Modified:
trunk/neo/storage/handlers/replication.py
trunk/neo/tests/storage/testReplicationHandler.py
Modified: trunk/neo/storage/handlers/replication.py
==============================================================================
--- trunk/neo/storage/handlers/replication.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/replication.py [iso-8859-1] Tue Nov 2 16:21:56 2010
@@ -26,6 +26,10 @@ from neo import util
RANGE_LENGTH = 4000
MIN_RANGE_LENGTH = 1000
+CHECK_CHUNK = 0
+CHECK_REPLICATE = 1
+CHECK_DONE = 2
+
"""
Replication algorythm
@@ -179,66 +183,99 @@ class ReplicationHandler(EventHandler):
return Packets.AskObjectHistoryFrom(min_oid, min_serial, max_serial,
length, partition_id)
- @checkConnectionIsReplicatorConnection
- def answerCheckTIDRange(self, conn, min_tid, length, count, tid_checksum,
- max_tid):
- replicator = self.app.replicator
- our = replicator.getTIDCheckResult(min_tid, length)
- his = (count, tid_checksum, max_tid)
- p = None
- if our != his:
- # Something is different...
+ def _checkRange(self, match, current_boundary, next_boundary, length,
+ count):
+ if match:
+ # Same data on both sides
+ if length < RANGE_LENGTH and length == count:
+ # ...and previous check detected a difference - and we still
+ # haven't reached the end. This means that we just check the
+ # first half of a chunk which, as a whole, is different. So
+ # next test must happen on the next chunk.
+ recheck_min_boundary = next_boundary
+ else:
+ # ...and we just checked a whole chunk, move on to the next
+ # one.
+ recheck_min_boundary = None
+ else:
+ # Something is different in current chunk
+ recheck_min_boundary = current_boundary
+ if recheck_min_boundary is None:
+ if count == length:
+ # Go on with next chunk
+ action = CHECK_CHUNK
+ params = (next_boundary, RANGE_LENGTH)
+ else:
+ # No more chunks.
+ action = CHECK_DONE
+ params = None
+ else:
+ # We must recheck current chunk.
if length <= MIN_RANGE_LENGTH:
# We are already at minimum chunk length, replicate.
- conn.ask(self._doAskTIDsFrom(min_tid, count))
+ action = CHECK_REPLICATE
+ params = (recheck_min_boundary, )
else:
# Check a smaller chunk.
- # Note: this could be made into a real binary search, but is
- # it really worth the work ?
# Note: +1, so we can detect we reached the end when answer
# comes back.
- p = self._doAskCheckTIDRange(min_tid, min(length / 2,
- count + 1))
- if p is None:
- if count == length and \
- max_tid < replicator.getCurrentCriticalTID():
- # Go on with next chunk
- p = self._doAskCheckTIDRange(add64(max_tid, 1))
+ action = CHECK_CHUNK
+ params = (recheck_min_boundary, min(length / 2, count + 1))
+ return action, params
+
+ @checkConnectionIsReplicatorConnection
+ def answerCheckTIDRange(self, conn, min_tid, length, count, tid_checksum,
+ max_tid):
+ ask = conn.ask
+ replicator = self.app.replicator
+ next_tid = add64(max_tid, 1)
+ action, params = self._checkRange(
+ replicator.getTIDCheckResult(min_tid, length) == (
+ count, tid_checksum, max_tid), min_tid, next_tid, length,
+ count)
+ if action == CHECK_REPLICATE:
+ (min_tid, ) = params
+ ask(self._doAskTIDsFrom(min_tid, count))
+ if length == count:
+ action = CHECK_CHUNK
+ params = (next_tid, RANGE_LENGTH)
else:
- # If no more TID, a replication of transactions is finished.
- # So start to replicate objects now.
- p = self._doAskCheckSerialRange(ZERO_OID, ZERO_TID)
- conn.ask(p)
+ action = CHECK_DONE
+ if action == CHECK_CHUNK:
+ (min_tid, count) = params
+ if min_tid >= replicator.getCurrentCriticalTID():
+ # Stop if past critical TID
+ action = CHECK_DONE
+ else:
+ ask(self._doAskCheckTIDRange(min_tid, count))
+ if action == CHECK_DONE:
+ # If no more TID, a replication of transactions is finished.
+ # So start to replicate objects now.
+ ask(self._doAskCheckSerialRange(ZERO_OID, ZERO_TID))
@checkConnectionIsReplicatorConnection
def answerCheckSerialRange(self, conn, min_oid, min_serial, length, count,
oid_checksum, max_oid, serial_checksum, max_serial):
+ ask = conn.ask
replicator = self.app.replicator
- our = replicator.getSerialCheckResult(min_oid, min_serial, length)
- his = (count, oid_checksum, max_oid, serial_checksum, max_serial)
- p = None
- if our != his:
- # Something is different...
- if length <= MIN_RANGE_LENGTH:
- # We are already at minimum chunk length, replicate.
- conn.ask(self._doAskObjectHistoryFrom(min_oid, min_serial,
- count))
- else:
- # Check a smaller chunk.
- # Note: this could be made into a real binary search, but is
- # it really worth the work ?
- # Note: +1, so we can detect we reached the end when answer
- # comes back.
- p = self._doAskCheckSerialRange(min_oid, min_serial,
- min(length / 2, count + 1))
- if p is None:
- if count == length:
- # Go on with next chunk
- p = self._doAskCheckSerialRange(max_oid, add64(max_serial, 1))
+ next_params = (max_oid, add64(max_serial, 1))
+ action, params = self._checkRange(
+ replicator.getSerialCheckResult(min_oid, min_serial, length) == (
+ count, oid_checksum, max_oid, serial_checksum, max_serial),
+ (min_oid, min_serial), next_params, length, count)
+ if action == CHECK_REPLICATE:
+ ((min_oid, min_serial), ) = params
+ ask(self._doAskObjectHistoryFrom(min_oid, min_serial, count))
+ if length == count:
+ action = CHECK_CHUNK
+ params = (next_params, RANGE_LENGTH)
else:
- # Nothing remains, so the replication for this partition is
- # finished.
- replicator.setReplicationDone()
- if p is not None:
- conn.ask(p)
+ action = CHECK_DONE
+ if action == CHECK_CHUNK:
+ ((min_oid, min_serial), count) = params
+ ask(self._doAskCheckSerialRange(min_oid, min_serial, count))
+ if action == CHECK_DONE:
+ # Nothing remains, so the replication for this partition is
+ # finished.
+ replicator.setReplicationDone()
Modified: trunk/neo/tests/storage/testReplicationHandler.py
==============================================================================
--- trunk/neo/tests/storage/testReplicationHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testReplicationHandler.py [iso-8859-1] Tue Nov 2 16:21:56 2010
@@ -237,12 +237,12 @@ class StorageReplicationHandlerTests(Neo
data_serial)], None, False)
# CheckTIDRange
- def test_answerCheckTIDRangeIdenticalChunkWithNext(self):
+ def test_answerCheckTIDFullRangeIdenticalChunkWithNext(self):
min_tid = self.getNextTID()
max_tid = self.getNextTID()
critical_tid = self.getNextTID()
assert max_tid < critical_tid
- length = RANGE_LENGTH / 2
+ length = RANGE_LENGTH
rid = 12
conn = self.getFakeConnection()
app = self.getApp(tid_check_result=(length, 0, max_tid), rid=rid,
@@ -261,6 +261,30 @@ class StorageReplicationHandlerTests(Neo
self.assertEqual(len(calls), 1)
calls[0].checkArgs(pmin_tid, plength, ppartition)
+ def test_answerCheckTIDSmallRangeIdenticalChunkWithNext(self):
+ min_tid = self.getNextTID()
+ max_tid = self.getNextTID()
+ critical_tid = self.getNextTID()
+ assert max_tid < critical_tid
+ length = RANGE_LENGTH / 2
+ rid = 12
+ conn = self.getFakeConnection()
+ app = self.getApp(tid_check_result=(length, 0, max_tid), rid=rid,
+ conn=conn, critical_tid=critical_tid)
+ handler = ReplicationHandler(app)
+ # Peer has the same data as we have: length, checksum and max_tid
+ # match.
+ handler.answerCheckTIDRange(conn, min_tid, length, length, 0, max_tid)
+ # Result: go on with next chunk
+ pmin_tid, plength, ppartition = self.checkAskPacket(conn,
+ Packets.AskCheckTIDRange, decode=True)
+ self.assertEqual(pmin_tid, add64(max_tid, 1))
+ self.assertEqual(plength, length / 2)
+ self.assertEqual(ppartition, rid)
+ calls = app.replicator.mockGetNamedCalls('checkTIDRange')
+ self.assertEqual(len(calls), 1)
+ calls[0].checkArgs(pmin_tid, plength, ppartition)
+
def test_answerCheckTIDRangeIdenticalChunkAboveCriticalTID(self):
critical_tid = self.getNextTID()
min_tid = self.getNextTID()
@@ -409,12 +433,12 @@ class StorageReplicationHandlerTests(Neo
calls[0].checkArgs(pmin_oid, pmin_serial, plength, ppartition)
# CheckSerialRange
- def test_answerCheckSerialRangeIdenticalChunkWithNext(self):
+ def test_answerCheckSerialFullRangeIdenticalChunkWithNext(self):
min_oid = self.getOID(1)
max_oid = self.getOID(10)
min_serial = self.getNextTID()
max_serial = self.getNextTID()
- length = RANGE_LENGTH / 2
+ length = RANGE_LENGTH
rid = 12
conn = self.getFakeConnection()
app = self.getApp(serial_check_result=(length, 0, max_oid, 1,
@@ -434,6 +458,31 @@ class StorageReplicationHandlerTests(Neo
self.assertEqual(len(calls), 1)
calls[0].checkArgs(pmin_oid, pmin_serial, plength, ppartition)
+ def test_answerCheckSerialSmallRangeIdenticalChunkWithNext(self):
+ min_oid = self.getOID(1)
+ max_oid = self.getOID(10)
+ min_serial = self.getNextTID()
+ max_serial = self.getNextTID()
+ length = RANGE_LENGTH / 2
+ rid = 12
+ conn = self.getFakeConnection()
+ app = self.getApp(serial_check_result=(length, 0, max_oid, 1,
+ max_serial), rid=rid, conn=conn)
+ handler = ReplicationHandler(app)
+ # Peer has the same data as we have
+ handler.answerCheckSerialRange(conn, min_oid, min_serial, length,
+ length, 0, max_oid, 1, max_serial)
+ # Result: go on with next chunk
+ pmin_oid, pmin_serial, plength, ppartition = self.checkAskPacket(conn,
+ Packets.AskCheckSerialRange, decode=True)
+ self.assertEqual(pmin_oid, max_oid)
+ self.assertEqual(pmin_serial, add64(max_serial, 1))
+ self.assertEqual(plength, length / 2)
+ self.assertEqual(ppartition, rid)
+ calls = app.replicator.mockGetNamedCalls('checkSerialRange')
+ self.assertEqual(len(calls), 1)
+ calls[0].checkArgs(pmin_oid, pmin_serial, plength, ppartition)
+
def test_answerCheckSerialRangeIdenticalChunkWithoutNext(self):
min_oid = self.getOID(1)
max_oid = self.getOID(10)
More information about the Neo-report
mailing list