[Neo-report] r1996 gregory - /trunk/neo/tests/storage/testStorageMySQLdb.py
nobody at svn.erp5.org
nobody at svn.erp5.org
Thu Apr 1 17:58:32 CEST 2010
Author: gregory
Date: Thu Apr 1 17:58:27 2010
New Revision: 1996
Log:
Rewrite of database manager tests.
Modified:
trunk/neo/tests/storage/testStorageMySQLdb.py
Modified: trunk/neo/tests/storage/testStorageMySQLdb.py
==============================================================================
--- trunk/neo/tests/storage/testStorageMySQLdb.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testStorageMySQLdb.py [iso-8859-1] Thu Apr 1 17:58:27 2010
@@ -34,6 +34,7 @@
# db manager
database = '%s@%s' % (NEO_SQL_USER, NEO_SQL_DATABASE)
self.db = MySQLDatabaseManager(database)
+ self.db.setup()
def tearDown(self):
self.db.close()
@@ -43,15 +44,7 @@
call = self.db.conn.mockGetNamedCalls('query')[call]
call.checkArgs('BEGIN')
- def test_01_p64(self):
- self.assertEquals(p64(0), '\0' * 8)
- self.assertEquals(p64(1), '\0' * 7 +'\1')
-
- def test_02_u64(self):
- self.assertEquals(u64('\0' * 8), 0)
- self.assertEquals(u64('\0' * 7 + '\n'), 10)
-
- def test_03_MySQLDatabaseManagerInit(self):
+ def test_MySQLDatabaseManagerInit(self):
db = MySQLDatabaseManager('%s@%s' % (NEO_SQL_USER, NEO_SQL_DATABASE))
# init
self.assertEquals(db.db, NEO_SQL_DATABASE)
@@ -60,7 +53,7 @@
self.assertTrue(isinstance(db.conn, MySQLdb.connection))
self.assertEquals(db.isUnderTransaction(), False)
- def test_05_begin(self):
+ def test_begin(self):
# no current transaction
self.db.conn = Mock({ })
self.assertEquals(self.db.isUnderTransaction(), False)
@@ -68,14 +61,14 @@
self.checkCalledQuery(query='COMMIT')
self.assertEquals(self.db.isUnderTransaction(), True)
- def test_06_commit(self):
+ def test_commit(self):
self.db.conn = Mock()
self.db.begin()
self.db.commit()
self.assertEquals(len(self.db.conn.mockGetNamedCalls('commit')), 1)
self.assertEquals(self.db.isUnderTransaction(), False)
- def test_06_rollback(self):
+ def test_rollback(self):
# rollback called and no current transaction
self.db.conn = Mock({ })
self.db.under_transaction = True
@@ -83,7 +76,7 @@
self.assertEquals(len(self.db.conn.mockGetNamedCalls('rollback')), 1)
self.assertEquals(self.db.isUnderTransaction(), False)
- def test_07_query1(self):
+ def test_query1(self):
# fake result object
from array import array
result_object = Mock({
@@ -101,7 +94,7 @@
self.assertEquals(len(calls), 1)
calls[0].checkArgs('QUERY')
- def test_07_query2(self):
+ def test_query2(self):
# test the OperationalError exception
# fake object, raise exception during the first call
from MySQLdb import OperationalError
@@ -121,7 +114,7 @@
self.db.query('QUERY')
self.assertTrue(self.connect_called)
- def test_07_query3(self):
+ def test_query3(self):
# OperationalError > raise DatabaseFailure exception
from MySQLdb import OperationalError
class FakeConn(object):
@@ -132,11 +125,11 @@
self.db.conn = FakeConn()
self.assertRaises(DatabaseFailure, self.db.query, 'QUERY')
- def test_08_escape(self):
+ def test_escape(self):
self.assertEquals(self.db.escape('a"b'), 'a\\"b')
self.assertEquals(self.db.escape("a'b"), "a\\'b")
- def test_09_setup(self):
+ def test_setup(self):
# create all tables
self.db.conn = Mock()
self.db.setup()
@@ -148,49 +141,29 @@
calls = self.db.conn.mockGetNamedCalls('query')
self.assertEquals(len(calls), 7)
- def test_10_getConfiguration(self):
- # check if a configuration entry is well read
- self.db.setup()
- # doesn't exists, raise
- self.assertRaises(KeyError, self.db.getConfiguration, 'a')
- self.db.query("insert into config values ('a', 'b');")
- result = self.db.getConfiguration('a')
- # exists, check result
- self.assertEquals(result, 'b')
-
- def test_11_setConfiguration(self):
+ def test_configuration(self):
# check if a configuration entry is well written
- self.db.setup()
self.db.setConfiguration('a', 'c')
result = self.db.getConfiguration('a')
self.assertEquals(result, 'c')
def checkConfigEntry(self, get_call, set_call, value):
# generic test for all configuration entries accessors
- self.db.setup()
self.assertRaises(KeyError, get_call)
set_call(value)
self.assertEquals(get_call(), value)
set_call(value * 2)
self.assertEquals(get_call(), value * 2)
- def test_12_UUID(self):
- self.checkConfigEntry(
- get_call=self.db.getUUID,
- set_call=self.db.setUUID,
- value='TEST_VALUE')
-
- def test_13_NumPartitions(self):
- self.checkConfigEntry(
- get_call=self.db.getNumPartitions,
- set_call=self.db.setNumPartitions,
- value=10)
-
- def test_14_Name(self):
- self.checkConfigEntry(
- get_call=self.db.getName,
- set_call=self.db.setName,
- value='TEST_NAME')
+ def test_UUID(self):
+ self.checkConfigEntry(self.db.getUUID, self.db.setUUID, 'TEST_VALUE')
+
+ def test_NumPartitions(self):
+ self.checkConfigEntry(self.db.getNumPartitions,
+ self.db.setNumPartitions, 10)
+
+ def test_Name(self):
+ self.checkConfigEntry(self.db.getName, self.db.setName, 'TEST_NAME')
def test_15_PTID(self):
self.checkConfigEntry(
@@ -198,479 +171,401 @@
set_call=self.db.setPTID,
value=self.getPTID(1))
- def test_16_getPartitionTable(self):
- # insert an entry and check it
- self.db.setup()
- rid, uuid, state = '\x00' * 8, '\x00' * 16, 0
- self.db.query("insert into pt (rid, uuid, state) values ('%s', '%s', %d)" %
- (dump(rid), dump(uuid), state))
- pt = self.db.getPartitionTable()
- self.assertEquals(pt, [(0L, uuid, state)])
-
- def test_17_getLastOID(self):
- self.db.setup()
- an_oid = '\x01' * 8
- self.db.setLastOID(an_oid)
- result = self.db.getLastOID()
- self.assertEquals(result, an_oid)
-
- def test_18_getLastTID(self):
+ def test_getPartitionTable(self):
+ ptid = self.getPTID(1)
+ uuid1, uuid2 = self.getNewUUID(), self.getNewUUID()
+ cell1 = (0, uuid1, CellStates.OUT_OF_DATE)
+ cell2 = (1, uuid1, CellStates.UP_TO_DATE)
+ self.db.setPartitionTable(ptid, [cell1, cell2])
+ result = self.db.getPartitionTable()
+ self.assertEqual(result, [cell1, cell2])
+
+ def test_getLastOID(self):
+ oid1 = self.getOID(1)
+ self.db.setLastOID(oid1)
+ result1 = self.db.getLastOID()
+ self.assertEquals(result1, oid1)
+
+ def getOIDs(self, count):
+ return [self.getOID(i) for i in xrange(count)]
+
+ def getTIDs(self, count):
+ tid_list = [self.getNextTID()]
+ while len(tid_list) != count:
+ tid_list.append(self.getNextTID(tid_list[-1]))
+ return tid_list
+
+ def getTransaction(self, oid_list):
+ transaction = (oid_list, 'user', 'desc', 'ext', False)
+ object_list = [(oid, 1, 0, '', None) for oid in oid_list]
+ return (transaction, object_list)
+
+ def checkSet(self, list1, list2):
+ self.assertEqual(set(list1), set(list2))
+
+ def test_getLastTID(self):
+ tid1, tid2, tid3, tid4 = self.getTIDs(4)
+ oid1, oid2 = self.getOIDs(2)
+ txn, objs = self.getTransaction([oid1, oid2])
# max TID is in obj table
- self.db.setup()
- self.db.query("""insert into trans (tid, oids, user,
- description, ext) values (1, '', '', '', '')""")
- self.db.query("""insert into trans (tid, oids, user,
- description, ext) values (2, '', '', '', '')""")
+ self.db.storeTransaction(tid1, objs, txn, False)
+ self.db.storeTransaction(tid2, objs, txn, False)
+ self.assertEqual(self.db.getLastTID(), tid2)
+ # max tid is in ttrans table
+ self.db.storeTransaction(tid3, objs, txn)
result = self.db.getLastTID()
- self.assertEquals(result, '\x00' * 7 + '\x02')
- # max tid is in ttrans table
- self.db.query("""insert into ttrans (tid, oids, user,
- description, ext) values (3, '', '', '', '')""")
- result = self.db.getLastTID()
- self.assertEquals(result, '\x00' * 7 + '\x03')
+ self.assertEqual(self.db.getLastTID(), tid3)
# max tid is in tobj (serial)
- self.db.query("""insert into tobj (oid, serial, compression,
- checksum, value) values (0, 4, 0, 0, '')""")
- result = self.db.getLastTID()
- self.assertEquals(result, '\x00' * 7 + '\x04')
-
- def test_19_getUnfinishedTIDList(self):
- self.db.setup()
- self.db.query("""insert into ttrans (tid, oids, user,
- description, ext) values (3, '', '', '', '')""")
- self.db.query("""insert into tobj (oid, serial, compression,
- checksum, value) values (0, 4, 0, 0, '')""")
- result = self.db.getUnfinishedTIDList()
- expected = ['\x00' * 7 + '\x03', '\x00' * 7 + '\x04']
- self.assertEquals(result, expected)
-
- def test_20_objectPresent(self):
- self.db.setup()
- oid1, tid1 = '\x00' * 7 + '\x01', '\x00' * 7 + '\x01'
- oid2, tid2 = '\x00' * 7 + '\x02', '\x00' * 7 + '\x02'
- # object not present
- self.assertFalse(self.db.objectPresent(oid1, tid1, all=False))
- self.assertFalse(self.db.objectPresent(oid1, tid1, all=True))
- self.assertFalse(self.db.objectPresent(oid2, tid2, all=False))
- self.assertFalse(self.db.objectPresent(oid2, tid2, all=True))
- # object present in temp table
- self.db.query("""insert into tobj (oid, serial, compression,
- checksum, value) values (%d, %d, 0, 0, '')""" %
- (u64(oid1), u64(tid1)))
- self.assertFalse(self.db.objectPresent(oid1, tid1, all=False))
- self.assertTrue(self.db.objectPresent(oid1, tid1, all=True))
- self.assertFalse(self.db.objectPresent(oid2, tid2, all=False))
- self.assertFalse(self.db.objectPresent(oid2, tid2, all=True))
- # object present in both table
- self.db.query("""insert into obj (oid, serial, compression,
- checksum, value) values ("%s", "%s", 0, 0, '')""" %
- (u64(oid2), u64(tid2)))
- self.assertFalse(self.db.objectPresent(oid1, tid1, all=False))
- self.assertTrue(self.db.objectPresent(oid1, tid1, all=True))
- self.assertTrue(self.db.objectPresent(oid2, tid2, all=False))
- self.assertTrue(self.db.objectPresent(oid2, tid2, all=True))
-
- def test_21_getObject(self):
- self.db.setup()
- oid1, tid1 = '\x00' * 7 + '\x01', '\x00' * 7 + '\x01'
- oid2, tid2 = '\x00' * 7 + '\x02', '\x00' * 7 + '\x02'
- # tid specified and object not present
+ self.db.storeTransaction(tid4, objs, None)
+ self.assertEqual(self.db.getLastTID(), tid4)
+
+ def test_getUnfinishedTIDList(self):
+ tid1, tid2, tid3, tid4 = self.getTIDs(4)
+ oid1, oid2 = self.getOIDs(2)
+ txn, objs = self.getTransaction([oid1, oid2])
+ # nothing pending
+ self.db.storeTransaction(tid1, objs, txn, False)
+ self.checkSet(self.db.getUnfinishedTIDList(), [])
+ # one unfinished txn
+ self.db.storeTransaction(tid2, objs, txn)
+ self.checkSet(self.db.getUnfinishedTIDList(), [tid2])
+ # no changes
+ self.db.storeTransaction(tid3, objs, None, False)
+ self.checkSet(self.db.getUnfinishedTIDList(), [tid2])
+ # a second txn known by objs only
+ self.db.storeTransaction(tid4, objs, None)
+ self.checkSet(self.db.getUnfinishedTIDList(), [tid2, tid4])
+
+ def test_objectPresent(self):
+ tid = self.getNextTID()
+ oid = self.getOID(1)
+ txn, objs = self.getTransaction([oid])
+ # not present
+ self.assertFalse(self.db.objectPresent(oid, tid, all=True))
+ self.assertFalse(self.db.objectPresent(oid, tid, all=False))
+ # available in temp table
+ self.db.storeTransaction(tid, objs, txn)
+ self.assertTrue(self.db.objectPresent(oid, tid, all=True))
+ self.assertFalse(self.db.objectPresent(oid, tid, all=False))
+ # available in both tables
+ self.db.finishTransaction(tid)
+ self.assertTrue(self.db.objectPresent(oid, tid, all=True))
+ self.assertTrue(self.db.objectPresent(oid, tid, all=False))
+
+ def test_getObject(self):
+ oid1, oid2 = self.getOIDs(2)
+ tid1, tid2 = self.getTIDs(2)
+ txn1, objs1 = self.getTransaction([oid1])
+ txn2, objs2 = self.getTransaction([oid1])
+ # non-present
+ self.assertEqual(self.db.getObject(oid1, tid1), None)
+ self.assertEqual(self.db.getObject(oid2, tid2), None)
+ # one non-commited version
+ self.db.storeTransaction(tid1, objs1, txn1)
+ self.assertEqual(self.db.getObject(oid1, tid1), None)
+ # one commited version
+ self.db.finishTransaction(tid1)
result = self.db.getObject(oid1, tid1)
- self.assertEquals(result, None)
- # tid specified and object present
- self.db.query("""insert into obj (oid, serial, compression,
- checksum, value) values (%d, %d, 0, 0, '')""" %
- (u64(oid1), u64(tid1)))
+ self.assertEqual(result, (tid1, None, 1, 0, '', None))
+ self.assertEqual(self.db.getObject(oid1, before_tid=tid1), None)
+ # two version available, one non-commited
+ self.db.storeTransaction(tid2, objs2, txn2)
result = self.db.getObject(oid1, tid1)
- self.assertEquals(result, (tid1, None, 0, 0, '', None))
- # before_tid specified, object not present
- result = self.db.getObject(oid2, before_tid=tid2)
- self.assertEquals(result, None)
- # before_tid specified, object present, no next serial
+ self.assertEqual(result, (tid1, None, 1, 0, '', None))
+ self.assertEqual(self.db.getObject(oid1, before_tid=tid1), None)
+ # two commited versions
+ self.db.finishTransaction(tid2)
+ result = self.db.getObject(oid1, tid1)
+ self.assertEqual(result, (tid1, None, 1, 0, '', None))
+ result = self.db.getObject(oid1, tid2)
+ self.assertEqual(result, (tid2, None, 1, 0, '', None))
result = self.db.getObject(oid1, before_tid=tid2)
- self.assertEquals(result, (tid1, None, 0, 0, '', None))
- # before_tid specified, object present, next serial exists
- self.db.query("""insert into obj (oid, serial, compression,
- checksum, value) values (%d, %d, 0, 0, '')""" %
- (u64(oid1), u64(tid2)))
- result = self.db.getObject(oid1, before_tid=tid2)
- self.assertEquals(result, (tid1, tid2, 0, 0, '', None))
- # no tid specified, retreive last object transaction, object unknown
- result = self.db.getObject(oid2)
- self.assertEquals(result, None)
- # same but object found
+ self.assertEqual(result, (tid1, tid2, 1, 0, '', None))
+ # no tid specified, return the last version
result = self.db.getObject(oid1)
- self.assertEquals(result, (tid2, None, 0, 0, '', None))
-
- def test_23_changePartitionTable(self):
- # two sn, two partitions
- self.db.setup()
- ptid = '1'
- uuid1, uuid2 = '\x00' * 15 + '\x01', '\x00' * 15 + '\x02'
- cells = (
- (0, uuid1, CellStates.DISCARDED),
- (1, uuid2, CellStates.UP_TO_DATE),
- )
- self.db.setPTID(INVALID_PTID)
- # empty table -> insert for second cell
- self.db.changePartitionTable(ptid, cells)
- result = self.db.query('select rid, uuid, state from pt')
- self.assertEquals(len(result), 1)
- self.assertEquals(result[0], (1, dump(uuid2), 1))
- self.assertEquals(self.db.getPTID(), ptid)
- # delete previous entries for a CellStates.DISCARDEDnode
- self.db.query("delete from pt")
- args = (0, dump(uuid1), CellStates.DISCARDED)
- self.db.query('insert into pt (rid, uuid, state) values (%d, "%s", %d)' % args)
- result = self.db.query('select rid, uuid, state from pt')
- self.assertEquals(len(result), 1)
- self.assertEquals(result[0], (0, dump(uuid1), 4))
- self.assertEquals(self.db.getPTID(), ptid)
- self.db.changePartitionTable(ptid, cells)
- result = self.db.query('select rid, uuid, state from pt')
- self.assertEquals(len(result), 1)
- self.assertEquals(result[0], (1, dump(uuid2), 1))
- self.assertEquals(self.db.getPTID(), ptid)
- # raise exception (config not set), check rollback
- self.db.query("drop table config") # will generate the exception
- args = (0, uuid1, CellStates.DISCARDED)
- self.db.query('insert into pt (rid, uuid, state) values (%d, "%s", %d)' % args)
- self.assertRaises(MySQLdb.ProgrammingError,
- self.db.changePartitionTable, ptid, cells)
- result = self.db.query('select count(*) from pt where rid=0')
- self.assertEquals(len(result), 1)
-
- def test_22_setGetPartitionTable(self):
- # two sn, two partitions
- self.db.setup()
- ptid = '1'
- uuid1, uuid2 = '\x00' * 15 + '\x01', '\x00' * 15 + '\x02'
- cells = (
- (0, uuid1, CellStates.DISCARDED),
- (1, uuid2, CellStates.UP_TO_DATE),
- )
- self.db.setPTID(INVALID_PTID)
- # not empty table, reset -> clean table first
- args = (0, uuid2, CellStates.UP_TO_DATE)
- self.db.query('insert into pt (rid, uuid, state) values (%d, "%s", %d)' % args)
- self.db.setPartitionTable(ptid, cells)
- result = self.db.query('select rid, uuid, state from pt')
- self.assertEquals(len(result), 1)
- self.assertEquals(result[0], (1, dump(uuid2), 1))
- self.assertEquals(self.db.getPTID(), ptid)
- # delete previous entries for a CellStates.DISCARDEDnode
- self.db.query("delete from pt")
- args = (0, uuid1, CellStates.DISCARDED)
- self.db.query('insert into pt (rid, uuid, state) values (%d, "%s", %d)' % args)
- result = self.db.query('select rid, uuid, state from pt')
- self.assertEquals(len(result), 1)
- self.assertEquals(result[0], (0, uuid1, 4))
- self.assertEquals(self.db.getPTID(), ptid)
- self.db.setPartitionTable(ptid, cells)
- result = self.db.query('select rid, uuid, state from pt')
- self.assertEquals(len(result), 1)
- self.assertEquals(result[0], (1, dump(uuid2), 1))
- self.assertEquals(self.db.getPTID(), ptid)
- # raise exception (config not set), check rollback
- self.db.query("drop table config") # will generate the exception
- args = (0, uuid1, CellStates.DISCARDED)
- self.db.query('insert into pt (rid, uuid, state) values (%d, "%s", %d)' % args)
- self.assertRaises(MySQLdb.ProgrammingError,
- self.db.setPartitionTable, ptid, cells)
- result = self.db.query('select count(*) from pt where rid=0')
- self.assertEquals(len(result), 1)
-
- def test_23_dropUnfinishedData(self):
- # delete entries from tobj and ttrans
- self.db.setup()
- self.db.query("""insert into tobj (oid, serial, compression,
- checksum, value) values (0, 4, 0, 0, '')""")
- self.db.query("""insert into ttrans (tid, oids, user,
- description, ext) values (3, '', '', '', '')""")
+ self.assertEqual(result, (tid2, None, 1, 0, '', None))
+
+ def test_setPartitionTable(self):
+ ptid = self.getPTID(1)
+ uuid1, uuid2 = self.getNewUUID(), self.getNewUUID()
+ cell1 = (0, uuid1, CellStates.OUT_OF_DATE)
+ cell2 = (1, uuid1, CellStates.UP_TO_DATE)
+ cell3 = (1, uuid1, CellStates.DISCARDED)
+ # no partition table
+ self.assertEqual(self.db.getPartitionTable(), [])
+ # set one
+ self.db.setPartitionTable(ptid, [cell1])
+ result = self.db.getPartitionTable()
+ self.assertEqual(result, [cell1])
+ # then another
+ self.db.setPartitionTable(ptid, [cell2])
+ result = self.db.getPartitionTable()
+ self.assertEqual(result, [cell2])
+ # drop discarded cells
+ self.db.setPartitionTable(ptid, [cell2, cell3])
+ result = self.db.getPartitionTable()
+ self.assertEqual(result, [])
+
+ def test_changePartitionTable(self):
+ ptid = self.getPTID(1)
+ uuid1, uuid2 = self.getNewUUID(), self.getNewUUID()
+ cell1 = (0, uuid1, CellStates.OUT_OF_DATE)
+ cell2 = (1, uuid1, CellStates.UP_TO_DATE)
+ cell3 = (1, uuid1, CellStates.DISCARDED)
+ # no partition table
+ self.assertEqual(self.db.getPartitionTable(), [])
+ # set one
+ self.db.changePartitionTable(ptid, [cell1])
+ result = self.db.getPartitionTable()
+ self.assertEqual(result, [cell1])
+ # add more entries
+ self.db.changePartitionTable(ptid, [cell2])
+ result = self.db.getPartitionTable()
+ self.assertEqual(result, [cell1, cell2])
+ # drop discarded cells
+ self.db.changePartitionTable(ptid, [cell2, cell3])
+ result = self.db.getPartitionTable()
+ self.assertEqual(result, [cell1])
+
+ def test_dropUnfinishedData(self):
+ oid1, oid2 = self.getOIDs(2)
+ tid1, tid2 = self.getTIDs(2)
+ txn1, objs1 = self.getTransaction([oid1])
+ txn2, objs2 = self.getTransaction([oid1])
+ # nothing
+ self.assertEqual(self.db.getObject(oid1), None)
+ self.assertEqual(self.db.getObject(oid2), None)
+ self.assertEqual(self.db.getUnfinishedTIDList(), [])
+ # one is still pending
+ self.db.storeTransaction(tid1, objs1, txn1)
+ self.db.storeTransaction(tid2, objs2, txn2)
+ self.db.finishTransaction(tid1)
+ result = self.db.getObject(oid1)
+ self.assertEqual(result, (tid1, None, 1, 0, '', None))
+ self.assertEqual(self.db.getObject(oid2), None)
+ self.assertEqual(self.db.getUnfinishedTIDList(), [tid2])
+ # drop it
self.db.dropUnfinishedData()
- result = self.db.query('select * from tobj')
- self.assertEquals(result, ())
- result = self.db.query('select * from ttrans')
- self.assertEquals(result, ())
-
- def test_24_storeTransaction1(self):
- # data set
- tid = '\x00' * 7 + '\x01'
- oid1, oid2 = '\x00' * 7 + '\x01', '\x00' * 7 + '\x02'
- object_list = ( (oid1, 0, 0, '', None), (oid2, 0, 0, '', None),)
- transaction = ((oid1, oid2), 'user', 'desc', 'ext', False)
- # store objects in temporary table
- self.db.setup()
- self.db.storeTransaction(tid, object_list, transaction=None, temporary=True)
- result = self.db.query('select oid, serial, compression, checksum, value from tobj')
- self.assertEquals(len(result), 2)
- self.assertEquals(result[0], (1L, 1, 0, 0, ''))
- self.assertEquals(result[1], (2L, 1, 0, 0, ''))
- result = self.db.query('select * from obj')
- self.assertEquals(len(result), 0)
- # and in obj table
- self.db.setup(reset=True)
- self.db.storeTransaction(tid, object_list, transaction=None, temporary=False)
- result = self.db.query('select oid, serial, compression, checksum, value from obj')
- self.assertEquals(len(result), 2)
- self.assertEquals(result[0], (1L, 1, 0, 0, ''))
- self.assertEquals(result[1], (2L, 1, 0, 0, ''))
- result = self.db.query('select * from tobj')
- self.assertEquals(len(result), 0)
- # store transaction in temporary table
- self.db.setup(reset=True)
- self.db.storeTransaction(tid, (), transaction=transaction, temporary=True)
- result = self.db.query('select tid, oids, user, description, ext from ttrans')
- self.assertEquals(len(result), 1)
- self.assertEquals(result[0], (1L, oid1 + oid2, 'user', 'desc', 'ext',) )
- result = self.db.query('select * from trans')
- self.assertEquals(len(result), 0)
- # and in trans table
- self.db.setup(reset=True)
- self.db.storeTransaction(tid, (), transaction=transaction, temporary=False)
- result = self.db.query('select tid, oids, user, description, ext from trans')
- self.assertEquals(len(result), 1)
- self.assertEquals(result[0], (1L, oid1 + oid2, 'user', 'desc', 'ext',))
- result = self.db.query('select * from ttrans')
- self.assertEquals(len(result), 0)
-
- def test_25_askFinishTransaction(self):
- # data set
- tid1, tid2 = '\x00' * 7 + '\x01', '\x00' * 7 + '\x02'
- oid1, oid2 = '\x00' * 7 + '\x01', '\x00' * 7 + '\x02'
- object_list = ( (oid1, 0, 0, '', None), (oid2, 0, 0, '', None),)
- transaction = ((oid1, oid2), 'u', 'd', 'e', False)
- self.db.setup(reset=True)
- # store two temporary transactions
- self.db.storeTransaction(tid1, object_list, transaction, temporary=True)
- self.db.storeTransaction(tid2, object_list, transaction, temporary=True)
- result = self.db.query('select count(*) from tobj')
- self.assertEquals(result[0][0], 4)
- result = self.db.query('select count(*) from ttrans')
- self.assertEquals(result[0][0], 2)
- # finish t1
- self.db.finishTransaction(tid1)
- # t1 should be finished
- result = self.db.query('select * from obj order by oid asc')
- self.assertEquals(len(result), 2)
- self.assertEquals(result[0], (1L, 1L, 0, 0, '', None))
- self.assertEquals(result[1], (2L, 1L, 0, 0, '', None))
- result = self.db.query('select * from trans')
- self.assertEquals(len(result), 1)
- self.assertEquals(result[0], (1L, 0, oid1 + oid2, 'u', 'd', 'e',))
- # t2 should stay in temporary tables
- result = self.db.query('select * from tobj order by oid asc')
- self.assertEquals(len(result), 2)
- self.assertEquals(result[0], (1L, 2L, 0, 0, '', None))
- self.assertEquals(result[1], (2L, 2L, 0, 0, '', None))
- result = self.db.query('select * from ttrans')
- self.assertEquals(len(result), 1)
- self.assertEquals(result[0], (2L, 0, oid1 + oid2, 'u', 'd', 'e',))
-
- def test_26_deleteTransaction(self):
- # data set
- tid1, tid2 = '\x00' * 7 + '\x01', '\x00' * 7 + '\x02'
- oid1, oid2 = '\x00' * 7 + '\x01', '\x00' * 7 + '\x02'
- object_list = ( (oid1, 0, 0, '', None), (oid2, 0, 0, '', None),)
- transaction = ((oid1, oid2), 'u', 'd', 'e', False)
- self.db.setup(reset=True)
- # store two transactions in both state
- self.db.storeTransaction(tid1, object_list, transaction, temporary=True)
- self.db.storeTransaction(tid2, object_list, transaction, temporary=True)
- self.db.storeTransaction(tid1, object_list, transaction, temporary=False)
- self.db.storeTransaction(tid2, object_list, transaction, temporary=False)
- result = self.db.query('select count(*) from tobj')
- self.assertEquals(result[0][0], 4)
- result = self.db.query('select count(*) from ttrans')
- self.assertEquals(result[0][0], 2)
- result = self.db.query('select count(*) from obj')
- self.assertEquals(result[0][0], 4)
- result = self.db.query('select count(*) from trans')
- self.assertEquals(result[0][0], 2)
- # delete t1 (all)
- self.db.deleteTransaction(tid1, all=True)
- # t2 not altered
- result = self.db.query('select * from tobj order by oid asc')
- self.assertEquals(len(result), 2)
- self.assertEquals(result[0], (1L, 2L, 0, 0, '', None))
- self.assertEquals(result[1], (2L, 2L, 0, 0, '', None))
- result = self.db.query('select * from ttrans')
- self.assertEquals(len(result), 1)
- self.assertEquals(result[0], (2L, 0, oid1 + oid2, 'u', 'd', 'e',))
- result = self.db.query('select * from obj order by oid asc')
- self.assertEquals(len(result), 2)
- self.assertEquals(result[0], (1L, 2L, 0, 0, '', None))
- self.assertEquals(result[1], (2L, 2L, 0, 0, '', None))
- result = self.db.query('select * from trans')
- self.assertEquals(len(result), 1)
- self.assertEquals(result[0], (2L, 0, oid1 + oid2, 'u', 'd', 'e',))
- # store t1 again
- self.db.storeTransaction(tid1, object_list, transaction, temporary=True)
- self.db.storeTransaction(tid1, object_list, transaction, temporary=False)
- # and remove it but only from temporary tables
- self.db.deleteTransaction(tid1, all=False)
- # t2 not altered and t1 stay in obj/trans tables
- result = self.db.query('select * from tobj order by oid asc')
- self.assertEquals(len(result), 2)
- self.assertEquals(result[0], (1L, 2L, 0, 0, '', None))
- self.assertEquals(result[1], (2L, 2L, 0, 0, '', None))
- result = self.db.query('select * from ttrans')
- self.assertEquals(len(result), 1)
- self.assertEquals(result[0], (2L, 0, oid1 + oid2, 'u', 'd', 'e',))
- result = self.db.query('select * from obj order by oid, serial asc')
- self.assertEquals(len(result), 4)
- self.assertEquals(result[0], (1L, 1L, 0, 0, '', None))
- self.assertEquals(result[1], (1L, 2L, 0, 0, '', None))
- self.assertEquals(result[2], (2L, 1L, 0, 0, '', None))
- self.assertEquals(result[3], (2L, 2L, 0, 0, '', None))
- result = self.db.query('select * from trans order by tid asc')
- self.assertEquals(len(result), 2)
- self.assertEquals(result[0], (1L, 0, oid1 + oid2, 'u', 'd', 'e',))
- self.assertEquals(result[1], (2L, 0, oid1 + oid2, 'u', 'd', 'e',))
-
- def test_27_getTransaction(self):
- # data set
- tid1, tid2 = '\x00' * 7 + '\x01', '\x00' * 7 + '\x02'
- oid1, oid2 = '\x00' * 7 + '\x01', '\x00' * 7 + '\x02'
- oids = [oid1, oid2]
- transaction = ((oid1, oid2), 'u', 'd', 'e', False)
- self.db.setup(reset=True)
- # store t1 in temporary and t2 in persistent tables
- self.db.storeTransaction(tid1, (), transaction, temporary=True)
- self.db.storeTransaction(tid2, (), transaction, temporary=False)
- # get t1 from all -> OK
- t = self.db.getTransaction(tid1, all=True)
- self.assertEquals(t, (oids, 'u', 'd', 'e', False))
- # get t1 from no tmp only -> fail
- t = self.db.getTransaction(tid1, all=False)
- self.assertEquals(t, None)
- # get t2 from all or not -> always OK
- t = self.db.getTransaction(tid2, all=True)
- self.assertEquals(t, (oids, 'u', 'd', 'e', False))
- t = self.db.getTransaction(tid2, all=False)
- self.assertEquals(t, (oids, 'u', 'd', 'e', False))
- # store wrong oids -> DatabaseFailure
- self.db.setup(reset=True)
- self.db.query("""replace into trans (tid, oids, user, description, ext)
- values ('%s', '%s', 'u', 'd', 'e')""" % (u64(tid1), 'OIDs_'))
- self.assertRaises(DatabaseFailure, self.db.getTransaction, tid1)
-
- def test_28_getOIDList(self):
- # there are two partitions and two objects in each of them
- # o1 & o3 in p1, o2 & o4 in p2
- self.db.setup(reset=True)
- tid = '\x00' * 7 + '\x01'
- oid1, oid2, oid3, oid4 = ['\x00' * 7 + chr(i) for i in xrange(4)]
- for oid in (oid1, oid2, oid3, oid4):
- self.db.query("replace into obj values (%d, %d, 0, 0, '', NULL)" %
- (u64(oid), u64(tid)))
- # get all oids for all partitions
- result = self.db.getOIDList(0, 4, 2, (0, 1))
- self.assertEquals(result, [oid4, oid3, oid2, oid1])
- # get all oids but from the second with a limit a two
- result = self.db.getOIDList(1, 2, 2, (0, 1))
- self.assertEquals(result, [oid3, oid2])
- # get all oids for the first partition
- result = self.db.getOIDList(0, 2, 2, (0, ))
- self.assertEquals(result, [oid3, oid1])
- # get all oids for the second partition with a limit of one
- result = self.db.getOIDList(0, 1, 2, (1, ))
- self.assertEquals(result, [oid4])
- # get all oids for the second partition with an offset of 3 > nothing
- result = self.db.getOIDList(3, 2, 2, (1, ))
- self.assertEquals(result, [])
- # get oids for an inexsitant partition -> nothing
- result = self.db.getOIDList(0, 2, 2, (3, ))
- self.assertEquals(result, [])
-
- def test_29_getObjectHistory(self):
- # there is one object with 4 revisions
- self.db.setup(reset=True)
- tids = ['\x00' * 7 + chr(i) for i in xrange(4)]
- oid = '\x00' * 8
- for tid in tids:
- self.db.query("replace into obj values (%d, %d, 0, 0, '', NULL)" %
- (u64(oid), u64(tid)))
- # unkwown object
- result = self.db.getObjectHistory(oid='\x01' * 8)
- self.assertEquals(result, None)
- # retreive all revisions
- result = self.db.getObjectHistory(oid=oid, offset=0, length=4)
- expected = [(tid, 0) for tid in reversed(tids)]
- self.assertEquals(result, expected)
- # get from the second, limit to 2 revisions
- result = self.db.getObjectHistory(oid=oid, offset=1, length=2)
- expected = [(tids[2], 0), (tids[1], 0)]
- self.assertEquals(result, expected)
- # get from the fifth -> nothing
- result = self.db.getObjectHistory(oid=oid, offset=4, length=2)
- self.assertEquals(result, None)
-
- def test_28_getTIDList(self):
- # same as for getOIDList, 2 partitions and 4 transactions
- self.db.setup(reset=True)
- tids = ['\x00' * 7 + chr(i) for i in xrange(4)]
- tid1, tid2, tid3, tid4 = tids
- for tid in tids:
- self.db.query("""replace into trans values (%d, '', 'u', 'd', 'e',
- False)""" % (u64(tid)))
- # get all tids for all partitions
- result = self.db.getTIDList(0, 4, 2, (0, 1))
- self.assertEquals(result, [tid4, tid3, tid2, tid1])
- # get all tids but from the second with a limit a two
- result = self.db.getTIDList(1, 2, 2, (0, 1))
- self.assertEquals(result, [tid3, tid2])
- # get all tids for the first partition
- result = self.db.getTIDList(0, 2, 2, (0, ))
- self.assertEquals(result, [tid3, tid1])
- # get all tids for the second partition with a limit of one
- result = self.db.getTIDList(0, 1, 2, (1, ))
- self.assertEquals(result, [tid4])
- # get all tids for the second partition with an offset of 3 > nothing
- result = self.db.getTIDList(3, 2, 2, (1, ))
- self.assertEquals(result, [])
- # get tids for an inexsitant partition -> nothing
- result = self.db.getTIDList(0, 2, 2, (3, ))
- self.assertEquals(result, [])
-
- def test_29_getTIDListPresent(self):
- # 4 sample transactions
- self.db.setup(reset=True)
- tid = '\x00' * 7 + '\x01'
- tid1, tid2, tid3, tid4 = ['\x00' * 7 + chr(i) for i in xrange(4)]
- for tid in (tid1, tid2, tid3, tid4):
- self.db.query("""replace into trans values (%d, '', 'u', 'd', 'e',
- False)""" % (u64(tid)))
- # all match
- result = self.db.getTIDListPresent((tid1, tid2, tid3, tid4))
- expected = [tid1, tid2, tid3, tid4]
- self.assertEquals(sorted(result), expected)
- # none match
- result = self.db.getTIDListPresent(('\x01' * 8, '\x02' * 8))
- self.assertEquals(result, [])
- # some match
- result = self.db.getTIDListPresent((tid1, tid3))
- self.assertEquals(sorted(result), [tid1, tid3])
-
- def test_30_getSerialListPresent(self):
- # 4 sample objects
- self.db.setup(reset=True)
- tids = ['\x00' * 7 + chr(i) for i in xrange(4)]
- tid1, tid2, tid3, tid4 = tids
- oid = '\x00' * 8
- for tid in tids:
- self.db.query("replace into obj values (%d, %d, 0, 0, '', NULL)" \
- % (u64(oid), u64(tid)))
- # all match
- result = self.db.getSerialListPresent(oid, tids)
- expected = list(tids)
- self.assertEquals(sorted(result), expected)
- # none match
- result = self.db.getSerialListPresent(oid, ('\x01' * 8, '\x02' * 8))
- self.assertEquals(result, [])
- # some match
- result = self.db.getSerialListPresent(oid, (tid1, tid3))
- self.assertEquals(sorted(result), [tid1, tid3])
+ self.assertEqual(self.db.getUnfinishedTIDList(), [])
+ result = self.db.getObject(oid1)
+ self.assertEqual(result, (tid1, None, 1, 0, '', None))
+ self.assertEqual(self.db.getObject(oid2), None)
+
+ def test_storeTransaction(self):
+ oid1, oid2 = self.getOIDs(2)
+ tid1, tid2 = self.getTIDs(2)
+ txn1, objs1 = self.getTransaction([oid1])
+ txn2, objs2 = self.getTransaction([oid2])
+ # nothing in database
+ self.assertEqual(self.db.getLastTID(), None)
+ self.assertEqual(self.db.getUnfinishedTIDList(), [])
+ self.assertEqual(self.db.getObject(oid1), None)
+ self.assertEqual(self.db.getObject(oid2), None)
+ self.assertEqual(self.db.getTransaction(tid1, True), None)
+ self.assertEqual(self.db.getTransaction(tid2, True), None)
+ self.assertEqual(self.db.getTransaction(tid1, False), None)
+ self.assertEqual(self.db.getTransaction(tid2, False), None)
+ # store in temporary tables
+ self.db.storeTransaction(tid1, objs1, txn1)
+ self.db.storeTransaction(tid2, objs2, txn2)
+ result = self.db.getTransaction(tid1, True)
+ self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False))
+ result = self.db.getTransaction(tid2, True)
+ self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False))
+ self.assertEqual(self.db.getTransaction(tid1, False), None)
+ self.assertEqual(self.db.getTransaction(tid2, False), None)
+ # commit pending transaction
+ self.db.finishTransaction(tid1)
+ self.db.finishTransaction(tid2)
+ result = self.db.getTransaction(tid1, True)
+ self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False))
+ result = self.db.getTransaction(tid2, True)
+ self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False))
+ result = self.db.getTransaction(tid1, False)
+ self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False))
+ result = self.db.getTransaction(tid2, False)
+ self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False))
+
+ def test_askFinishTransaction(self):
+ oid1, oid2 = self.getOIDs(2)
+ tid1, tid2 = self.getTIDs(2)
+ txn1, objs1 = self.getTransaction([oid1])
+ txn2, objs2 = self.getTransaction([oid2])
+ # stored but not finished
+ self.db.storeTransaction(tid1, objs1, txn1)
+ self.db.storeTransaction(tid2, objs2, txn2)
+ result = self.db.getTransaction(tid1, True)
+ self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False))
+ result = self.db.getTransaction(tid2, True)
+ self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False))
+ self.assertEqual(self.db.getTransaction(tid1, False), None)
+ self.assertEqual(self.db.getTransaction(tid2, False), None)
+ # stored and finished
+ self.db.finishTransaction(tid1)
+ self.db.finishTransaction(tid2)
+ result = self.db.getTransaction(tid1, True)
+ self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False))
+ result = self.db.getTransaction(tid2, True)
+ self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False))
+ result = self.db.getTransaction(tid1, False)
+ self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False))
+ result = self.db.getTransaction(tid2, False)
+ self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False))
+
+ def test_deleteTransaction(self):
+ oid1, oid2 = self.getOIDs(2)
+ tid1, tid2 = self.getTIDs(2)
+ txn1, objs1 = self.getTransaction([oid1])
+ txn2, objs2 = self.getTransaction([oid2])
+ # delete only from temporary tables
+ self.db.storeTransaction(tid1, objs1, txn1)
+ self.db.storeTransaction(tid2, objs2, txn2)
+ self.db.finishTransaction(tid1)
+ self.db.deleteTransaction(tid1)
+ self.db.deleteTransaction(tid2)
+ result = self.db.getTransaction(tid1, True)
+ self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False))
+ self.assertEqual(self.db.getTransaction(tid2, True), None)
+ # delete from all
+ self.db.deleteTransaction(tid1, True)
+ self.assertEqual(self.db.getTransaction(tid1, True), None)
+ self.assertEqual(self.db.getTransaction(tid2, True), None)
+
+ def test_getTransaction(self):
+ oid1, oid2 = self.getOIDs(2)
+ tid1, tid2 = self.getTIDs(2)
+ txn1, objs1 = self.getTransaction([oid1])
+ txn2, objs2 = self.getTransaction([oid2])
+ # get from temporary table or not
+ self.db.storeTransaction(tid1, objs1, txn1)
+ self.db.storeTransaction(tid2, objs2, txn2)
+ self.db.finishTransaction(tid1)
+ result = self.db.getTransaction(tid1, True)
+ self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False))
+ result = self.db.getTransaction(tid2, True)
+ self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False))
+ # get from non-temporary only
+ result = self.db.getTransaction(tid1, False)
+ self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False))
+ self.assertEqual(self.db.getTransaction(tid2, False), None)
+
+ def test_getOIDList(self):
+ # store four objects
+ oid1, oid2, oid3, oid4 = self.getOIDs(4)
+ tid = self.getNextTID()
+ txn, objs = self.getTransaction([oid1, oid2, oid3, oid4])
+ self.db.storeTransaction(tid, objs, txn)
+ self.db.finishTransaction(tid)
+ # get oids
+ result = self.db.getOIDList(0, 4, 1, [0])
+ self.checkSet(result, [oid1, oid2, oid3, oid4])
+ result = self.db.getOIDList(0, 4, 2, [0])
+ self.checkSet(result, [oid1, oid3])
+ result = self.db.getOIDList(0, 4, 2, [0, 1])
+ self.checkSet(result, [oid1, oid2, oid3, oid4])
+ result = self.db.getOIDList(0, 4, 3, [0])
+ self.checkSet(result, [oid1, oid4])
+ # get a subset of oids
+ result = self.db.getOIDList(2, 4, 1, [0])
+ self.checkSet(result, [oid1, oid2])
+ result = self.db.getOIDList(0, 2, 1, [0])
+ self.checkSet(result, [oid3, oid4])
+ result = self.db.getOIDList(0, 1, 3, [0])
+ self.checkSet(result, [oid4])
+
+ def test_getObjectHistory(self):
+ oid = self.getOID(1)
+ tid1, tid2, tid3 = self.getTIDs(3)
+ txn1, objs1 = self.getTransaction([oid])
+ txn2, objs2 = self.getTransaction([oid])
+ txn3, objs3 = self.getTransaction([oid])
+ # one revision
+ self.db.storeTransaction(tid1, objs1, txn1)
+ self.db.finishTransaction(tid1)
+ result = self.db.getObjectHistory(oid, 0, 3)
+ self.assertEqual(result, [(tid1, 0)])
+ result = self.db.getObjectHistory(oid, 1, 1)
+ self.assertEqual(result, None)
+ # two revisions
+ self.db.storeTransaction(tid2, objs2, txn2)
+ self.db.finishTransaction(tid2)
+ result = self.db.getObjectHistory(oid, 0, 3)
+ self.assertEqual(result, [(tid2, 0), (tid1, 0)])
+ result = self.db.getObjectHistory(oid, 1, 3)
+ self.assertEqual(result, [(tid1, 0)])
+ result = self.db.getObjectHistory(oid, 2, 3)
+ self.assertEqual(result, None)
+
+ def test_getTIDList(self):
+ # use OID generator to know result of tid % N
+ tid1, tid2, tid3, tid4 = self.getOIDs(4)
+ oid = self.getOID(1)
+ txn1, objs1 = self.getTransaction([oid])
+ txn2, objs2 = self.getTransaction([oid])
+ txn3, objs3 = self.getTransaction([oid])
+ txn4, objs4 = self.getTransaction([oid])
+ # store four transaction
+ self.db.storeTransaction(tid1, objs1, txn1)
+ self.db.storeTransaction(tid2, objs2, txn2)
+ self.db.storeTransaction(tid3, objs3, txn3)
+ self.db.storeTransaction(tid4, objs4, txn4)
+ self.db.finishTransaction(tid1)
+ self.db.finishTransaction(tid2)
+ self.db.finishTransaction(tid3)
+ self.db.finishTransaction(tid4)
+ # get tids
+ result = self.db.getTIDList(0, 4, 1, [0])
+ self.checkSet(result, [tid1, tid2, tid3, tid4])
+ result = self.db.getTIDList(0, 4, 2, [0])
+ self.checkSet(result, [tid1, tid3])
+ result = self.db.getTIDList(0, 4, 2, [0, 1])
+ self.checkSet(result, [tid1, tid2, tid3, tid4])
+ result = self.db.getTIDList(0, 4, 3, [0])
+ self.checkSet(result, [tid1, tid4])
+ # get a subset of tids
+ result = self.db.getTIDList(2, 4, 1, [0])
+ self.checkSet(result, [tid1, tid2])
+ result = self.db.getTIDList(0, 2, 1, [0])
+ self.checkSet(result, [tid3, tid4])
+ result = self.db.getTIDList(0, 1, 3, [0])
+ self.checkSet(result, [tid4])
+
+ def test_getTIDListPresent(self):
+ oid = self.getOID(1)
+ tid1, tid2, tid3, tid4 = self.getTIDs(4)
+ txn1, objs1 = self.getTransaction([oid])
+ txn4, objs4 = self.getTransaction([oid])
+ # four tids, two missing
+ self.db.storeTransaction(tid1, objs1, txn1)
+ self.db.finishTransaction(tid1)
+ self.db.storeTransaction(tid4, objs4, txn4)
+ self.db.finishTransaction(tid4)
+ result = self.db.getTIDListPresent([tid1, tid2, tid3, tid4])
+ self.checkSet(result, [tid1, tid4])
+ result = self.db.getTIDListPresent([tid1, tid2])
+ self.checkSet(result, [tid1])
+ self.assertEqual(self.db.getTIDListPresent([tid2, tid3]), [])
+
+ def test_getSerialListPresent(self):
+ oid1, oid2 = self.getOIDs(2)
+ tid1, tid2, tid3, tid4 = self.getTIDs(4)
+ txn1, objs1 = self.getTransaction([oid1])
+ txn2, objs2 = self.getTransaction([oid1])
+ txn3, objs3 = self.getTransaction([oid2])
+ txn4, objs4 = self.getTransaction([oid2])
+ # four object, one revision each
+ self.db.storeTransaction(tid1, objs1, txn1)
+ self.db.finishTransaction(tid1)
+ self.db.storeTransaction(tid4, objs4, txn4)
+ self.db.finishTransaction(tid4)
+ result = self.db.getSerialListPresent(oid1, [tid1, tid2])
+ self.checkSet(result, [tid1])
+ result = self.db.getSerialListPresent(oid2, [tid3, tid4])
+ self.checkSet(result, [tid4])
+ result = self.db.getSerialListPresent(oid1, [tid2])
+ self.assertEqual(result, [])
+ result = self.db.getSerialListPresent(oid2, [tid3])
+ self.assertEqual(result, [])
def test__getObjectData(self):
db = self.db
More information about the Neo-report
mailing list