[Neo-report] r2777 jm - in /trunk/neo/storage: database/ handlers/
nobody at svn.erp5.org
nobody at svn.erp5.org
Tue Jun 14 12:04:20 CEST 2011
Author: jm
Date: Tue Jun 14 12:04:20 2011
New Revision: 2777
Log:
storage: fix replication when clients already stored some objects
Modified:
trunk/neo/storage/database/btree.py
trunk/neo/storage/database/manager.py
trunk/neo/storage/database/mysqldb.py
trunk/neo/storage/handlers/replication.py
Modified: trunk/neo/storage/database/btree.py
==============================================================================
--- trunk/neo/storage/database/btree.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/btree.py [iso-8859-1] Tue Jun 14 12:04:20 2011
@@ -453,11 +453,13 @@ class BTreeDatabaseManager(DatabaseManag
prune(obj[oid])
del obj[oid]
- def deleteObjectsAbove(self, num_partitions, partition, oid, serial):
+ def deleteObjectsAbove(self, num_partitions, partition, oid, serial,
+ max_tid):
obj = self._obj
u64 = util.u64
oid = u64(oid)
serial = u64(serial)
+ max_tid = u64(max_tid)
if oid % num_partitions == partition:
try:
tserial = obj[oid]
@@ -465,11 +467,12 @@ class BTreeDatabaseManager(DatabaseManag
pass
else:
batchDelete(tserial, lambda _, __: True,
- iter_kw={'min': serial})
+ iter_kw={'min': serial, 'max': max_tid})
def same_partition(key, _):
return key % num_partitions == partition
batchDelete(obj, same_partition,
- iter_kw={'min': oid, 'excludemin': True}, recycle_subtrees=True)
+ iter_kw={'min': oid, 'excludemin': True, 'max': max_tid},
+ recycle_subtrees=True)
def getTransaction(self, tid, all=False):
tid = util.u64(tid)
Modified: trunk/neo/storage/database/manager.py
==============================================================================
--- trunk/neo/storage/database/manager.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/manager.py [iso-8859-1] Tue Jun 14 12:04:20 2011
@@ -401,9 +401,11 @@ class DatabaseManager(object):
given oid."""
raise NotImplementedError
- def deleteObjectsAbove(self, num_partitions, partition, oid, serial):
+ def deleteObjectsAbove(self, num_partitions, partition, oid, serial,
+ max_tid):
"""Delete all objects above given OID and serial (inclued) in given
- partition."""
+ partition, but never above max_tid (in case objects are stored during
+ replication)"""
raise NotImplementedError
def getTransaction(self, tid, all = False):
Modified: trunk/neo/storage/database/mysqldb.py
==============================================================================
--- trunk/neo/storage/database/mysqldb.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/mysqldb.py [iso-8859-1] Tue Jun 14 12:04:20 2011
@@ -574,13 +574,16 @@ class MySQLDatabaseManager(DatabaseManag
raise
self.commit()
- def deleteObjectsAbove(self, num_partitions, partition, oid, serial):
+ def deleteObjectsAbove(self, num_partitions, partition, oid, serial,
+ max_tid):
u64 = util.u64
self.begin()
try:
- self.objQuery('DELETE FROM %%(table)s WHERE partition=%(partition)d AND ('
+ self.objQuery('DELETE FROM %%(table)s WHERE partition=%(partition)d'
+ ' AND serial <= %(max_tid)d AND ('
'oid > %(oid)d OR (oid = %(oid)d AND serial >= %(serial)d))' % {
'partition': partition,
+ 'max_tid': u64(max_tid),
'oid': u64(oid),
'serial': u64(serial),
})
Modified: trunk/neo/storage/handlers/replication.py
==============================================================================
--- trunk/neo/storage/handlers/replication.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/replication.py [iso-8859-1] Tue Jun 14 12:04:20 2011
@@ -21,7 +21,7 @@ import neo.lib
from neo.lib.handler import EventHandler
from neo.lib.protocol import Packets, ZERO_TID, ZERO_OID
-from neo.lib.util import add64
+from neo.lib.util import add64, u64
# TODO: benchmark how different values behave
RANGE_LENGTH = 4000
@@ -315,8 +315,16 @@ class ReplicationHandler(EventHandler):
# Delete all objects we might have which are beyond what peer
# knows.
((last_oid, last_serial), ) = params
+ offset = replicator.getCurrentOffset()
+ max_tid = replicator.getCurrentCriticalTID()
+ neo.lib.logging.debug("Serial range checked (offset=%s, min_oid=%x,"
+ " min_serial=%x, length=%s, count=%s, max_oid=%x,"
+ " max_serial=%x, last_oid=%x, last_serial=%x, critical_tid=%x)",
+ offset, u64(min_oid), u64(min_serial), length, count,
+ u64(max_oid), u64(max_serial), u64(last_oid), u64(last_serial),
+ u64(max_tid))
app.dm.deleteObjectsAbove(app.pt.getPartitions(),
- replicator.getCurrentOffset(), last_oid, last_serial)
+ offset, last_oid, last_serial, max_tid)
# Nothing remains, so the replication for this partition is
# finished.
replicator.setReplicationDone()
More information about the Neo-report
mailing list