[Neo-report] r2465 vincent - in /trunk/neo: storage/database/ storage/handlers/ tests/stor...
nobody at svn.erp5.org
nobody at svn.erp5.org
Wed Nov 17 14:05:28 CET 2010
Author: vincent
Date: Wed Nov 17 14:05:28 2010
New Revision: 2465
Log:
Add support for complete deletion of partition tail in replication.
This can happen when enough most-recent objects have been transactionally
un-created and database was packed.
Modified:
trunk/neo/storage/database/btree.py
trunk/neo/storage/database/manager.py
trunk/neo/storage/database/mysqldb.py
trunk/neo/storage/handlers/replication.py
trunk/neo/tests/storage/testReplicationHandler.py
trunk/neo/tests/storage/testStorageDBTests.py
Modified: trunk/neo/storage/database/btree.py
==============================================================================
--- trunk/neo/storage/database/btree.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/btree.py [iso-8859-1] Wed Nov 17 14:05:28 2010
@@ -435,6 +435,13 @@ class BTreeDatabaseManager(DatabaseManag
except KeyError:
pass
+ def deleteTransactionsAbove(self, num_partitions, partition, tid):
+ tid = util.u64(tid)
+ def same_partition(key, _):
+ return key % num_partitions == partition
+ batchDelete(self._trans, same_partition,
+ iter_kw={'min': tid, 'excludemin': True})
+
def deleteObject(self, oid, serial=None):
u64 = util.u64
oid = u64(oid)
@@ -458,6 +465,24 @@ class BTreeDatabaseManager(DatabaseManag
prune(obj[oid])
del obj[oid]
+ def deleteObjectsAbove(self, num_partitions, partition, oid, serial):
+ obj = self._obj
+ u64 = util.u64
+ oid = u64(oid)
+ serial = u64(serial)
+ if oid % num_partitions == partition:
+ try:
+ tserial = obj[oid]
+ except KeyError:
+ pass
+ else:
+ batchDelete(tserial, lambda _, __: True,
+ iter_kw={'min': serial, 'excludemin': True})
+ def same_partition(key, _):
+ return key % num_partitions == partition
+ batchDelete(obj, same_partition,
+ iter_kw={'min': oid, 'excludemin': True}, recycle_subtrees=True)
+
def getTransaction(self, tid, all=False):
tid = util.u64(tid)
try:
Modified: trunk/neo/storage/database/manager.py
==============================================================================
--- trunk/neo/storage/database/manager.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/manager.py [iso-8859-1] Wed Nov 17 14:05:28 2010
@@ -388,11 +388,21 @@ class DatabaseManager(object):
an oid list"""
raise NotImplementedError
+ def deleteTransactionsAbove(self, num_partitions, partition, tid):
+ """Delete all transactions above given TID (excluded) in given
+ partition."""
+ raise NotImplementedError
+
def deleteObject(self, oid, serial=None):
"""Delete given object. If serial is given, only delete that serial for
given oid."""
raise NotImplementedError
+ def deleteObjectsAbove(self, num_partitions, partition, oid, serial):
+ """Delete all objects above given OID and serial (excluded) in given
+ partition."""
+ raise NotImplementedError
+
def getTransaction(self, tid, all = False):
"""Return a tuple of the list of OIDs, user information,
a description, and extension information, for a given transaction
Modified: trunk/neo/storage/database/mysqldb.py
==============================================================================
--- trunk/neo/storage/database/mysqldb.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/mysqldb.py [iso-8859-1] Wed Nov 17 14:05:28 2010
@@ -557,6 +557,19 @@ class MySQLDatabaseManager(DatabaseManag
raise
self.commit()
+ def deleteTransactionsAbove(self, num_partitions, partition, tid):
+ self.begin()
+ try:
+ self.query('DELETE FROM trans WHERE partition=%(partition)d AND '
+ 'tid > %(tid)d' % {
+ 'partition': partition,
+ 'tid': util.u64(tid),
+ })
+ except:
+ self.rollback()
+ raise
+ self.commit()
+
def deleteObject(self, oid, serial=None):
u64 = util.u64
oid = u64(oid)
@@ -577,6 +590,21 @@ class MySQLDatabaseManager(DatabaseManag
raise
self.commit()
+ def deleteObjectsAbove(self, num_partitions, partition, oid, serial):
+ u64 = util.u64
+ self.begin()
+ try:
+ self.query('DELETE FROM obj WHERE partition=%(partition)d AND '
+ 'oid > %(oid)d OR (oid = %(oid)d AND serial > %(serial)d)' % {
+ 'partition': partition,
+ 'oid': u64(oid),
+ 'serial': u64(serial),
+ })
+ except:
+ self.rollback()
+ raise
+ self.commit()
+
def getTransaction(self, tid, all = False):
q = self.query
tid = util.u64(tid)
Modified: trunk/neo/storage/handlers/replication.py
==============================================================================
--- trunk/neo/storage/handlers/replication.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/replication.py [iso-8859-1] Wed Nov 17 14:05:28 2010
@@ -242,7 +242,8 @@ class ReplicationHandler(EventHandler):
def answerCheckTIDRange(self, conn, min_tid, length, count, tid_checksum,
max_tid):
ask = conn.ask
- replicator = self.app.replicator
+ app = self.app
+ replicator = app.replicator
next_tid = add64(max_tid, 1)
action, params = self._checkRange(
replicator.getTIDCheckResult(min_tid, length) == (
@@ -261,6 +262,10 @@ class ReplicationHandler(EventHandler):
else:
ask(self._doAskCheckTIDRange(min_tid, count))
if action == CHECK_DONE:
+ # Delete all transactions we might have which are beyond what peer
+ # knows.
+ app.dm.deleteTransactionsAbove(app.pt.getPartitions(),
+ replicator.getCurrentRID(), max_tid)
# If no more TID, a replication of transactions is finished.
# So start to replicate objects now.
ask(self._doAskCheckSerialRange(ZERO_OID, ZERO_TID))
@@ -269,7 +274,8 @@ class ReplicationHandler(EventHandler):
def answerCheckSerialRange(self, conn, min_oid, min_serial, length, count,
oid_checksum, max_oid, serial_checksum, max_serial):
ask = conn.ask
- replicator = self.app.replicator
+ app = self.app
+ replicator = app.replicator
next_params = (max_oid, add64(max_serial, 1))
action, params = self._checkRange(
replicator.getSerialCheckResult(min_oid, min_serial, length) == (
@@ -284,6 +290,10 @@ class ReplicationHandler(EventHandler):
((min_oid, min_serial), count) = params
ask(self._doAskCheckSerialRange(min_oid, min_serial, count))
if action == CHECK_DONE:
+ # Delete all objects we might have which are beyond what peer
+ # knows.
+ app.dm.deleteObjectsAbove(app.pt.getPartitions(),
+ replicator.getCurrentRID(), max_oid, max_serial)
# Nothing remains, so the replication for this partition is
# finished.
replicator.setReplicationDone()
Modified: trunk/neo/tests/storage/testReplicationHandler.py
==============================================================================
--- trunk/neo/tests/storage/testReplicationHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testReplicationHandler.py [iso-8859-1] Wed Nov 17 14:05:28 2010
@@ -67,10 +67,12 @@ class StorageReplicationHandlerTests(Neo
pass
def getApp(self, conn=None, tid_check_result=(0, 0, ZERO_TID),
- serial_check_result=(0, 0, ZERO_OID, 0, ZERO_TID),
- tid_result=(),
- history_result=None,
- rid=0, critical_tid=ZERO_TID):
+ serial_check_result=(0, 0, ZERO_OID, 0, ZERO_TID),
+ tid_result=(),
+ history_result=None,
+ rid=0, critical_tid=ZERO_TID,
+ num_partitions=1,
+ ):
if history_result is None:
history_result = {}
replicator = Mock({
@@ -99,6 +101,9 @@ class StorageReplicationHandlerTests(Neo
'storeTransaction': None,
'deleteObject': None,
})
+ pt = Mock({
+ 'getPartitions': num_partitions,
+ })
return FakeApp
def _checkReplicationStarted(self, conn, rid, replicator):
@@ -360,9 +365,10 @@ class StorageReplicationHandlerTests(Neo
max_tid = self.getNextTID()
length = RANGE_LENGTH / 2
rid = 12
+ num_partitions = 13
conn = self.getFakeConnection()
app = self.getApp(tid_check_result=(length - 1, 0, max_tid), rid=rid,
- conn=conn)
+ conn=conn, num_partitions=num_partitions)
handler = ReplicationHandler(app)
# Peer has the same data as we have: length, checksum and max_tid
# match.
@@ -378,6 +384,10 @@ class StorageReplicationHandlerTests(Neo
calls = app.replicator.mockGetNamedCalls('checkSerialRange')
self.assertEqual(len(calls), 1)
calls[0].checkArgs(pmin_oid, pmin_serial, plength, ppartition)
+ # ...and delete partition tail
+ calls = app.dm.mockGetNamedCalls('deleteTransactionsAbove')
+ self.assertEqual(len(calls), 1)
+ calls[0].checkArgs(num_partitions, rid, max_tid)
def test_answerCheckTIDRangeDifferentBigChunk(self):
min_tid = self.getNextTID()
@@ -514,9 +524,10 @@ class StorageReplicationHandlerTests(Neo
max_serial = self.getNextTID()
length = RANGE_LENGTH / 2
rid = 12
+ num_partitions = 13
conn = self.getFakeConnection()
app = self.getApp(serial_check_result=(length - 1, 0, max_oid, 1,
- max_serial), rid=rid, conn=conn)
+ max_serial), rid=rid, conn=conn, num_partitions=num_partitions)
handler = ReplicationHandler(app)
# Peer has the same data as we have
handler.answerCheckSerialRange(conn, min_oid, min_serial, length,
@@ -524,6 +535,10 @@ class StorageReplicationHandlerTests(Neo
# Result: mark replication as done
self.checkNoPacketSent(conn)
self.assertTrue(app.replicator.replication_done)
+ # ...and delete partition tail
+ calls = app.dm.mockGetNamedCalls('deleteObjectsAbove')
+ self.assertEqual(len(calls), 1)
+ calls[0].checkArgs(num_partitions, rid, max_oid, max_serial)
def test_answerCheckSerialRangeDifferentBigChunk(self):
min_oid = self.getOID(1)
@@ -590,9 +605,12 @@ class StorageReplicationHandlerTests(Neo
critical_tid = self.getNextTID()
length = MIN_RANGE_LENGTH - 1
rid = 12
+ num_partitions = 13
conn = self.getFakeConnection()
app = self.getApp(tid_check_result=(length - 5, 0, max_oid,
- 1, max_serial), rid=rid, conn=conn, critical_tid=critical_tid)
+ 1, max_serial), rid=rid, conn=conn, critical_tid=critical_tid,
+ num_partitions=num_partitions,
+ )
handler = ReplicationHandler(app)
# Peer has different data, and less than length
handler.answerCheckSerialRange(conn, min_oid, min_serial, length,
@@ -611,6 +629,10 @@ class StorageReplicationHandlerTests(Neo
calls[0].checkArgs(pmin_oid, pmin_serial, pmax_serial, plength,
ppartition)
self.assertTrue(app.replicator.replication_done)
+ # ...and delete partition tail
+ calls = app.dm.mockGetNamedCalls('deleteObjectsAbove')
+ self.assertEqual(len(calls), 1)
+ calls[0].checkArgs(num_partitions, rid, max_oid, max_serial)
if __name__ == "__main__":
unittest.main()
Modified: trunk/neo/tests/storage/testStorageDBTests.py
==============================================================================
--- trunk/neo/tests/storage/testStorageDBTests.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testStorageDBTests.py [iso-8859-1] Wed Nov 17 14:05:28 2010
@@ -334,6 +334,24 @@ class StorageDBTests(NeoUnitTestBase):
self.assertEqual(self.db.getTransaction(tid1, True), None)
self.assertEqual(self.db.getTransaction(tid2, True), None)
+ def test_deleteTransactionsAbove(self):
+ self.db.setNumPartitions(2)
+ tid1 = self.getOID(0)
+ tid2 = self.getOID(1)
+ tid3 = self.getOID(2)
+ oid1 = self.getOID(1)
+ for tid in (tid1, tid2, tid3):
+ txn, objs = self.getTransaction([oid1])
+ self.db.storeTransaction(tid, objs, txn)
+ self.db.finishTransaction(tid)
+ self.db.deleteTransactionsAbove(2, 0, tid1)
+ # Right partition, below cutoff
+ self.assertNotEqual(self.db.getTransaction(tid1, True), None)
+ # Wrong partition, above cutoff
+ self.assertNotEqual(self.db.getTransaction(tid2, True), None)
+ # Right partition, above cutoff
+ self.assertEqual(self.db.getTransaction(tid3, True), None)
+
def test_deleteObject(self):
oid1, oid2 = self.getOIDs(2)
tid1, tid2 = self.getTIDs(2)
@@ -351,6 +369,31 @@ class StorageDBTests(NeoUnitTestBase):
self.assertEqual(self.db.getObject(oid2, tid=tid2), (tid2, None) + \
objs2[1][1:])
+ def test_deleteObjectsAbove(self):
+ self.db.setNumPartitions(2)
+ tid1 = self.getOID(1)
+ tid2 = self.getOID(2)
+ tid3 = self.getOID(3)
+ oid1 = self.getOID(0)
+ oid2 = self.getOID(1)
+ oid3 = self.getOID(2)
+ for tid in (tid1, tid2, tid3):
+ txn, objs = self.getTransaction([oid1, oid2, oid3])
+ self.db.storeTransaction(tid, objs, txn)
+ self.db.finishTransaction(tid)
+ self.db.deleteObjectsAbove(2, 0, oid1, tid1)
+ # Right partition, below cutoff
+ self.assertNotEqual(self.db.getObject(oid1, tid=tid1), None)
+ # Right partition, above tid cutoff
+ self.assertEqual(self.db.getObject(oid1, tid=tid2), False)
+ self.assertEqual(self.db.getObject(oid1, tid=tid3), False)
+ # Wrong partition, above cutoff
+ self.assertNotEqual(self.db.getObject(oid2, tid=tid1), None)
+ self.assertNotEqual(self.db.getObject(oid2, tid=tid2), None)
+ self.assertNotEqual(self.db.getObject(oid2, tid=tid3), None)
+ # Right partition, above cutoff
+ self.assertEqual(self.db.getObject(oid3), None)
+
def test_getTransaction(self):
oid1, oid2 = self.getOIDs(2)
tid1, tid2 = self.getTIDs(2)
More information about the Neo-report
mailing list