[Neo-report] r2780 jm - in /trunk/neo: storage/database/ storage/handlers/ tests/storage/
nobody at svn.erp5.org
nobody at svn.erp5.org
Tue Jun 14 14:29:59 CEST 2011
Author: jm
Date: Tue Jun 14 14:29:59 2011
New Revision: 2780
Log:
storage: fix replication when clients already committed transactions
Similar to r2777 but for transactions instead of objects.
Modified:
trunk/neo/storage/database/btree.py
trunk/neo/storage/database/manager.py
trunk/neo/storage/database/mysqldb.py
trunk/neo/storage/handlers/replication.py
trunk/neo/tests/storage/testReplicationHandler.py
trunk/neo/tests/storage/testStorageDBTests.py
Modified: trunk/neo/storage/database/btree.py
==============================================================================
--- trunk/neo/storage/database/btree.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/btree.py [iso-8859-1] Tue Jun 14 14:29:59 2011
@@ -423,12 +423,11 @@ class BTreeDatabaseManager(DatabaseManag
except KeyError:
pass
- def deleteTransactionsAbove(self, num_partitions, partition, tid):
- tid = util.u64(tid)
+ def deleteTransactionsAbove(self, num_partitions, partition, tid, max_tid):
def same_partition(key, _):
return key % num_partitions == partition
batchDelete(self._trans, same_partition,
- iter_kw={'min': tid})
+ iter_kw={'min': util.u64(tid), 'max': util.u64(max_tid)})
def deleteObject(self, oid, serial=None):
u64 = util.u64
Modified: trunk/neo/storage/database/manager.py
==============================================================================
--- trunk/neo/storage/database/manager.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/manager.py [iso-8859-1] Tue Jun 14 14:29:59 2011
@@ -391,9 +391,10 @@ class DatabaseManager(object):
an oid list"""
raise NotImplementedError
- def deleteTransactionsAbove(self, num_partitions, partition, tid):
+ def deleteTransactionsAbove(self, num_partitions, partition, tid, max_tid):
"""Delete all transactions above given TID (inclued) in given
- partition."""
+ partition, but never above max_tid (in case transactions are committed
+ during replication)."""
raise NotImplementedError
def deleteObject(self, oid, serial=None):
Modified: trunk/neo/storage/database/mysqldb.py
==============================================================================
--- trunk/neo/storage/database/mysqldb.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/mysqldb.py [iso-8859-1] Tue Jun 14 14:29:59 2011
@@ -541,13 +541,14 @@ class MySQLDatabaseManager(DatabaseManag
raise
self.commit()
- def deleteTransactionsAbove(self, num_partitions, partition, tid):
+ def deleteTransactionsAbove(self, num_partitions, partition, tid, max_tid):
self.begin()
try:
self.query('DELETE FROM trans WHERE partition=%(partition)d AND '
- 'tid >= %(tid)d' % {
+ '%(tid)d <= tid AND tid <= %(max_tid)d' % {
'partition': partition,
'tid': util.u64(tid),
+ 'max_tid': util.u64(max_tid),
})
except:
self.rollback()
Modified: trunk/neo/storage/handlers/replication.py
==============================================================================
--- trunk/neo/storage/handlers/replication.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/replication.py [iso-8859-1] Tue Jun 14 14:29:59 2011
@@ -256,6 +256,7 @@ class ReplicationHandler(EventHandler):
@checkConnectionIsReplicatorConnection
def answerCheckTIDRange(self, conn, min_tid, length, count, tid_checksum,
max_tid):
+ pkt_min_tid = min_tid
ask = conn.ask
app = self.app
replicator = app.replicator
@@ -264,6 +265,7 @@ class ReplicationHandler(EventHandler):
replicator.getTIDCheckResult(min_tid, length) == (
count, tid_checksum, max_tid), min_tid, next_tid, length,
count)
+ critical_tid = replicator.getCurrentCriticalTID()
if action == CHECK_REPLICATE:
(min_tid, ) = params
ask(self._doAskTIDsFrom(min_tid, count))
@@ -272,23 +274,26 @@ class ReplicationHandler(EventHandler):
params = (next_tid, )
if action == CHECK_CHUNK:
(min_tid, count) = params
- if min_tid >= replicator.getCurrentCriticalTID():
+ if min_tid >= critical_tid:
# Stop if past critical TID
action = CHECK_DONE
params = (next_tid, )
else:
- max_tid = replicator.getCurrentCriticalTID()
- ask(self._doAskCheckTIDRange(min_tid, max_tid, count))
+ ask(self._doAskCheckTIDRange(min_tid, critical_tid, count))
if action == CHECK_DONE:
# Delete all transactions we might have which are beyond what peer
# knows.
(last_tid, ) = params
+ offset = replicator.getCurrentOffset()
+ neo.lib.logging.debug("TID range checked (offset=%s, min_tid=%x,"
+ " length=%s, count=%s, max_tid=%x, last_tid=%x,"
+ " critical_tid=%x)", offset, u64(pkt_min_tid), length, count,
+ u64(max_tid), u64(last_tid), u64(critical_tid))
app.dm.deleteTransactionsAbove(app.pt.getPartitions(),
- replicator.getCurrentOffset(), last_tid)
+ offset, last_tid, critical_tid)
# If no more TID, a replication of transactions is finished.
# So start to replicate objects now.
- max_tid = replicator.getCurrentCriticalTID()
- ask(self._doAskCheckSerialRange(ZERO_OID, ZERO_TID, max_tid))
+ ask(self._doAskCheckSerialRange(ZERO_OID, ZERO_TID, critical_tid))
@checkConnectionIsReplicatorConnection
def answerCheckSerialRange(self, conn, min_oid, min_serial, length, count,
Modified: trunk/neo/tests/storage/testReplicationHandler.py
==============================================================================
--- trunk/neo/tests/storage/testReplicationHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testReplicationHandler.py [iso-8859-1] Tue Jun 14 14:29:59 2011
@@ -380,7 +380,7 @@ class StorageReplicationHandlerTests(Neo
# ...and delete partition tail
calls = app.dm.mockGetNamedCalls('deleteTransactionsAbove')
self.assertEqual(len(calls), 1)
- calls[0].checkArgs(num_partitions, rid, add64(max_tid, 1))
+ calls[0].checkArgs(num_partitions, rid, add64(max_tid, 1), ZERO_TID)
def test_answerCheckTIDRangeDifferentBigChunk(self):
min_tid = self.getNextTID()
Modified: trunk/neo/tests/storage/testStorageDBTests.py
==============================================================================
--- trunk/neo/tests/storage/testStorageDBTests.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testStorageDBTests.py [iso-8859-1] Tue Jun 14 14:29:59 2011
@@ -346,7 +346,7 @@ class StorageDBTests(NeoUnitTestBase):
txn, objs = self.getTransaction([oid1])
self.db.storeTransaction(tid, objs, txn)
self.db.finishTransaction(tid)
- self.db.deleteTransactionsAbove(2, 0, tid2)
+ self.db.deleteTransactionsAbove(2, 0, tid2, tid3)
# Right partition, below cutoff
self.assertNotEqual(self.db.getTransaction(tid1, True), None)
# Wrong partition, above cutoff
More information about the Neo-report
mailing list