[Neo-report] r2780 jm - in /trunk/neo: storage/database/ storage/handlers/ tests/storage/

nobody at svn.erp5.org nobody at svn.erp5.org
Tue Jun 14 14:29:59 CEST 2011


Author: jm
Date: Tue Jun 14 14:29:59 2011
New Revision: 2780

Log:
storage: fix replication when clients already committed transactions

Similar to r2777 but for transactions instead of objects.

Modified:
    trunk/neo/storage/database/btree.py
    trunk/neo/storage/database/manager.py
    trunk/neo/storage/database/mysqldb.py
    trunk/neo/storage/handlers/replication.py
    trunk/neo/tests/storage/testReplicationHandler.py
    trunk/neo/tests/storage/testStorageDBTests.py

Modified: trunk/neo/storage/database/btree.py
==============================================================================
--- trunk/neo/storage/database/btree.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/btree.py [iso-8859-1] Tue Jun 14 14:29:59 2011
@@ -423,12 +423,11 @@ class BTreeDatabaseManager(DatabaseManag
         except KeyError:
             pass
 
-    def deleteTransactionsAbove(self, num_partitions, partition, tid):
-        tid = util.u64(tid)
+    def deleteTransactionsAbove(self, num_partitions, partition, tid, max_tid):
         def same_partition(key, _):
             return key % num_partitions == partition
         batchDelete(self._trans, same_partition,
-            iter_kw={'min': tid})
+            iter_kw={'min': util.u64(tid), 'max': util.u64(max_tid)})
 
     def deleteObject(self, oid, serial=None):
         u64 = util.u64

Modified: trunk/neo/storage/database/manager.py
==============================================================================
--- trunk/neo/storage/database/manager.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/manager.py [iso-8859-1] Tue Jun 14 14:29:59 2011
@@ -391,9 +391,10 @@ class DatabaseManager(object):
         an oid list"""
         raise NotImplementedError
 
-    def deleteTransactionsAbove(self, num_partitions, partition, tid):
+    def deleteTransactionsAbove(self, num_partitions, partition, tid, max_tid):
         """Delete all transactions above given TID (inclued) in given
-        partition."""
+        partition, but never above max_tid (in case transactions are committed
+        during replication)."""
         raise NotImplementedError
 
     def deleteObject(self, oid, serial=None):

Modified: trunk/neo/storage/database/mysqldb.py
==============================================================================
--- trunk/neo/storage/database/mysqldb.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/mysqldb.py [iso-8859-1] Tue Jun 14 14:29:59 2011
@@ -541,13 +541,14 @@ class MySQLDatabaseManager(DatabaseManag
             raise
         self.commit()
 
-    def deleteTransactionsAbove(self, num_partitions, partition, tid):
+    def deleteTransactionsAbove(self, num_partitions, partition, tid, max_tid):
         self.begin()
         try:
             self.query('DELETE FROM trans WHERE partition=%(partition)d AND '
-              'tid >= %(tid)d' % {
+              '%(tid)d <= tid AND tid <= %(max_tid)d' % {
                 'partition': partition,
                 'tid': util.u64(tid),
+                'max_tid': util.u64(max_tid),
             })
         except:
             self.rollback()

Modified: trunk/neo/storage/handlers/replication.py
==============================================================================
--- trunk/neo/storage/handlers/replication.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/replication.py [iso-8859-1] Tue Jun 14 14:29:59 2011
@@ -256,6 +256,7 @@ class ReplicationHandler(EventHandler):
     @checkConnectionIsReplicatorConnection
     def answerCheckTIDRange(self, conn, min_tid, length, count, tid_checksum,
             max_tid):
+        pkt_min_tid = min_tid
         ask = conn.ask
         app = self.app
         replicator = app.replicator
@@ -264,6 +265,7 @@ class ReplicationHandler(EventHandler):
             replicator.getTIDCheckResult(min_tid, length) == (
             count, tid_checksum, max_tid), min_tid, next_tid, length,
             count)
+        critical_tid = replicator.getCurrentCriticalTID()
         if action == CHECK_REPLICATE:
             (min_tid, ) = params
             ask(self._doAskTIDsFrom(min_tid, count))
@@ -272,23 +274,26 @@ class ReplicationHandler(EventHandler):
                 params = (next_tid, )
         if action == CHECK_CHUNK:
             (min_tid, count) = params
-            if min_tid >= replicator.getCurrentCriticalTID():
+            if min_tid >= critical_tid:
                 # Stop if past critical TID
                 action = CHECK_DONE
                 params = (next_tid, )
             else:
-                max_tid = replicator.getCurrentCriticalTID()
-                ask(self._doAskCheckTIDRange(min_tid, max_tid, count))
+                ask(self._doAskCheckTIDRange(min_tid, critical_tid, count))
         if action == CHECK_DONE:
             # Delete all transactions we might have which are beyond what peer
             # knows.
             (last_tid, ) = params
+            offset = replicator.getCurrentOffset()
+            neo.lib.logging.debug("TID range checked (offset=%s, min_tid=%x,"
+                " length=%s, count=%s, max_tid=%x, last_tid=%x,"
+                " critical_tid=%x)", offset, u64(pkt_min_tid), length, count,
+                u64(max_tid), u64(last_tid), u64(critical_tid))
             app.dm.deleteTransactionsAbove(app.pt.getPartitions(),
-                replicator.getCurrentOffset(), last_tid)
+                offset, last_tid, critical_tid)
             # If no more TID, a replication of transactions is finished.
             # So start to replicate objects now.
-            max_tid = replicator.getCurrentCriticalTID()
-            ask(self._doAskCheckSerialRange(ZERO_OID, ZERO_TID, max_tid))
+            ask(self._doAskCheckSerialRange(ZERO_OID, ZERO_TID, critical_tid))
 
     @checkConnectionIsReplicatorConnection
     def answerCheckSerialRange(self, conn, min_oid, min_serial, length, count,

Modified: trunk/neo/tests/storage/testReplicationHandler.py
==============================================================================
--- trunk/neo/tests/storage/testReplicationHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testReplicationHandler.py [iso-8859-1] Tue Jun 14 14:29:59 2011
@@ -380,7 +380,7 @@ class StorageReplicationHandlerTests(Neo
         # ...and delete partition tail
         calls = app.dm.mockGetNamedCalls('deleteTransactionsAbove')
         self.assertEqual(len(calls), 1)
-        calls[0].checkArgs(num_partitions, rid, add64(max_tid, 1))
+        calls[0].checkArgs(num_partitions, rid, add64(max_tid, 1), ZERO_TID)
 
     def test_answerCheckTIDRangeDifferentBigChunk(self):
         min_tid = self.getNextTID()

Modified: trunk/neo/tests/storage/testStorageDBTests.py
==============================================================================
--- trunk/neo/tests/storage/testStorageDBTests.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testStorageDBTests.py [iso-8859-1] Tue Jun 14 14:29:59 2011
@@ -346,7 +346,7 @@ class StorageDBTests(NeoUnitTestBase):
             txn, objs = self.getTransaction([oid1])
             self.db.storeTransaction(tid, objs, txn)
             self.db.finishTransaction(tid)
-        self.db.deleteTransactionsAbove(2, 0, tid2)
+        self.db.deleteTransactionsAbove(2, 0, tid2, tid3)
         # Right partition, below cutoff
         self.assertNotEqual(self.db.getTransaction(tid1, True), None)
         # Wrong partition, above cutoff




More information about the Neo-report mailing list