[Neo-report] r2777 jm - in /trunk/neo/storage: database/ handlers/

nobody at svn.erp5.org nobody at svn.erp5.org
Tue Jun 14 12:04:20 CEST 2011


Author: jm
Date: Tue Jun 14 12:04:20 2011
New Revision: 2777

Log:
storage: fix replication when clients already stored some objects

Modified:
    trunk/neo/storage/database/btree.py
    trunk/neo/storage/database/manager.py
    trunk/neo/storage/database/mysqldb.py
    trunk/neo/storage/handlers/replication.py

Modified: trunk/neo/storage/database/btree.py
==============================================================================
--- trunk/neo/storage/database/btree.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/btree.py [iso-8859-1] Tue Jun 14 12:04:20 2011
@@ -453,11 +453,13 @@ class BTreeDatabaseManager(DatabaseManag
                 prune(obj[oid])
                 del obj[oid]
 
-    def deleteObjectsAbove(self, num_partitions, partition, oid, serial):
+    def deleteObjectsAbove(self, num_partitions, partition, oid, serial,
+                           max_tid):
         obj = self._obj
         u64 = util.u64
         oid = u64(oid)
         serial = u64(serial)
+        max_tid = u64(max_tid)
         if oid % num_partitions == partition:
             try:
                 tserial = obj[oid]
@@ -465,11 +467,12 @@ class BTreeDatabaseManager(DatabaseManag
                 pass
             else:
                 batchDelete(tserial, lambda _, __: True,
-                    iter_kw={'min': serial})
+                    iter_kw={'min': serial, 'max': max_tid})
         def same_partition(key, _):
             return key % num_partitions == partition
         batchDelete(obj, same_partition,
-            iter_kw={'min': oid, 'excludemin': True}, recycle_subtrees=True)
+            iter_kw={'min': oid, 'excludemin': True, 'max': max_tid},
+            recycle_subtrees=True)
 
     def getTransaction(self, tid, all=False):
         tid = util.u64(tid)

Modified: trunk/neo/storage/database/manager.py
==============================================================================
--- trunk/neo/storage/database/manager.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/manager.py [iso-8859-1] Tue Jun 14 12:04:20 2011
@@ -401,9 +401,11 @@ class DatabaseManager(object):
         given oid."""
         raise NotImplementedError
 
-    def deleteObjectsAbove(self, num_partitions, partition, oid, serial):
+    def deleteObjectsAbove(self, num_partitions, partition, oid, serial,
+            max_tid):
         """Delete all objects above given OID and serial (inclued) in given
-        partition."""
+        partition, but never above max_tid (in case objects are stored during
+        replication)"""
         raise NotImplementedError
 
     def getTransaction(self, tid, all = False):

Modified: trunk/neo/storage/database/mysqldb.py
==============================================================================
--- trunk/neo/storage/database/mysqldb.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/mysqldb.py [iso-8859-1] Tue Jun 14 12:04:20 2011
@@ -574,13 +574,16 @@ class MySQLDatabaseManager(DatabaseManag
             raise
         self.commit()
 
-    def deleteObjectsAbove(self, num_partitions, partition, oid, serial):
+    def deleteObjectsAbove(self, num_partitions, partition, oid, serial,
+                           max_tid):
         u64 = util.u64
         self.begin()
         try:
-            self.objQuery('DELETE FROM %%(table)s WHERE partition=%(partition)d AND ('
+            self.objQuery('DELETE FROM %%(table)s WHERE partition=%(partition)d'
+              ' AND serial <= %(max_tid)d AND ('
               'oid > %(oid)d OR (oid = %(oid)d AND serial >= %(serial)d))' % {
                 'partition': partition,
+                'max_tid': u64(max_tid),
                 'oid': u64(oid),
                 'serial': u64(serial),
             })

Modified: trunk/neo/storage/handlers/replication.py
==============================================================================
--- trunk/neo/storage/handlers/replication.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/replication.py [iso-8859-1] Tue Jun 14 12:04:20 2011
@@ -21,7 +21,7 @@ import neo.lib
 
 from neo.lib.handler import EventHandler
 from neo.lib.protocol import Packets, ZERO_TID, ZERO_OID
-from neo.lib.util import add64
+from neo.lib.util import add64, u64
 
 # TODO: benchmark how different values behave
 RANGE_LENGTH = 4000
@@ -315,8 +315,16 @@ class ReplicationHandler(EventHandler):
             # Delete all objects we might have which are beyond what peer
             # knows.
             ((last_oid, last_serial), ) = params
+            offset = replicator.getCurrentOffset()
+            max_tid = replicator.getCurrentCriticalTID()
+            neo.lib.logging.debug("Serial range checked (offset=%s, min_oid=%x,"
+                " min_serial=%x, length=%s, count=%s, max_oid=%x,"
+                " max_serial=%x, last_oid=%x, last_serial=%x, critical_tid=%x)",
+                offset, u64(min_oid), u64(min_serial), length, count,
+                u64(max_oid), u64(max_serial), u64(last_oid), u64(last_serial),
+                u64(max_tid))
             app.dm.deleteObjectsAbove(app.pt.getPartitions(),
-              replicator.getCurrentOffset(), last_oid, last_serial)
+                offset, last_oid, last_serial, max_tid)
             # Nothing remains, so the replication for this partition is
             # finished.
             replicator.setReplicationDone()




More information about the Neo-report mailing list