[Neo-report] r2221 vincent - in /trunk/neo: ./ storage/ storage/database/ storage/handlers...

nobody at svn.erp5.org nobody at svn.erp5.org
Tue Aug 24 18:02:16 CEST 2010


Author: vincent
Date: Tue Aug 24 18:02:15 2010
New Revision: 2221

Log:
Improve replication SQL queries.

It is more efficient to provide a boundary value than a row count range.
This fixes replication on partitions with a large number of objects, revisions
or transactions: query time is now constant where it used to increase, causing
timeout problems when query duration exceeded ping time + ping timeout (11s
currently).

Modified:
    trunk/neo/handler.py
    trunk/neo/protocol.py
    trunk/neo/storage/database/manager.py
    trunk/neo/storage/database/mysqldb.py
    trunk/neo/storage/handlers/__init__.py
    trunk/neo/storage/handlers/client.py
    trunk/neo/storage/handlers/replication.py
    trunk/neo/storage/handlers/storage.py
    trunk/neo/storage/replicator.py
    trunk/neo/tests/__init__.py
    trunk/neo/tests/storage/testStorageHandler.py
    trunk/neo/tests/storage/testStorageMySQLdb.py
    trunk/neo/tests/testProtocol.py

Modified: trunk/neo/handler.py
==============================================================================
--- trunk/neo/handler.py [iso-8859-1] (original)
+++ trunk/neo/handler.py [iso-8859-1] Tue Aug 24 18:02:15 2010
@@ -256,6 +256,12 @@ class EventHandler(object):
     def answerTIDs(self, conn, tid_list):
         raise UnexpectedPacketError
 
+    def askTIDsFrom(self, conn, min_tid, length, partition):
+        raise UnexpectedPacketError
+
+    def answerTIDsFrom(self, conn, tid_list):
+        raise UnexpectedPacketError
+
     def askTransactionInformation(self, conn, tid):
         raise UnexpectedPacketError
 
@@ -269,7 +275,13 @@ class EventHandler(object):
     def answerObjectHistory(self, conn, oid, history_list):
         raise UnexpectedPacketError
 
-    def askOIDs(self, conn, first, last, partition):
+    def askObjectHistoryFrom(self, conn, oid, min_serial, length):
+        raise UnexpectedPacketError
+
+    def answerObjectHistoryFrom(self, conn, oid, history_list):
+        raise UnexpectedPacketError
+
+    def askOIDs(self, conn, min_oid, length, partition):
         raise UnexpectedPacketError
 
     def answerOIDs(self, conn, oid_list):
@@ -414,11 +426,15 @@ class EventHandler(object):
         d[Packets.AnswerObject] = self.answerObject
         d[Packets.AskTIDs] = self.askTIDs
         d[Packets.AnswerTIDs] = self.answerTIDs
+        d[Packets.AskTIDsFrom] = self.askTIDsFrom
+        d[Packets.AnswerTIDsFrom] = self.answerTIDsFrom
         d[Packets.AskTransactionInformation] = self.askTransactionInformation
         d[Packets.AnswerTransactionInformation] = \
             self.answerTransactionInformation
         d[Packets.AskObjectHistory] = self.askObjectHistory
         d[Packets.AnswerObjectHistory] = self.answerObjectHistory
+        d[Packets.AskObjectHistoryFrom] = self.askObjectHistoryFrom
+        d[Packets.AnswerObjectHistoryFrom] = self.answerObjectHistoryFrom
         d[Packets.AskOIDs] = self.askOIDs
         d[Packets.AnswerOIDs] = self.answerOIDs
         d[Packets.AskPartitionList] = self.askPartitionList

Modified: trunk/neo/protocol.py
==============================================================================
--- trunk/neo/protocol.py [iso-8859-1] (original)
+++ trunk/neo/protocol.py [iso-8859-1] Tue Aug 24 18:02:15 2010
@@ -109,6 +109,8 @@ INVALID_OID = '\xff' * 8
 INVALID_PTID = '\0' * 8
 INVALID_SERIAL = INVALID_TID
 INVALID_PARTITION = 0xffffffff
+ZERO_TID = '\0' * 8
+ZERO_OID = '\0' * 8
 OID_LEN = len(INVALID_OID)
 
 UUID_NAMESPACES = {
@@ -1024,7 +1026,7 @@ class AnswerObject(Packet):
 class AskTIDs(Packet):
     """
     Ask for TIDs between a range of offsets. The order of TIDs is descending,
-    and the range is [first, last). C, S -> S.
+    and the range is [first, last). C -> S.
     """
     _header_format = '!QQL'
 
@@ -1036,7 +1038,7 @@ class AskTIDs(Packet):
 
 class AnswerTIDs(Packet):
     """
-    Answer the requested TIDs. S -> C, S.
+    Answer the requested TIDs. S -> C.
     """
     _header_format = '!L'
     _list_entry_format = '8s'
@@ -1060,6 +1062,25 @@ class AnswerTIDs(Packet):
             tid_list.append(tid)
         return (tid_list,)
 
+class AskTIDsFrom(Packet):
+    """
+    Ask for length TIDs starting at min_tid. The order of TIDs is ascending.
+    S -> S.
+    """
+    _header_format = '!8sLL'
+
+    def _encode(self, min_tid, length, partition):
+        return pack(self._header_format, min_tid, length, partition)
+
+    def _decode(self, body):
+        return unpack(self._header_format, body) # min_tid, length, partition
+
+class AnswerTIDsFrom(AnswerTIDs):
+    """
+    Answer the requested TIDs. S -> S
+    """
+    pass
+
 class AskTransactionInformation(Packet):
     """
     Ask information about a transaction. Any -> S.
@@ -1105,7 +1126,7 @@ class AnswerTransactionInformation(Packe
 class AskObjectHistory(Packet):
     """
     Ask history information for a given object. The order of serials is
-    descending, and the range is [first, last]. C, S -> S.
+    descending, and the range is [first, last]. C -> S.
     """
     _header_format = '!8sQQ'
 
@@ -1118,7 +1139,7 @@ class AskObjectHistory(Packet):
 
 class AnswerObjectHistory(Packet):
     """
-    Answer history information (serial, size) for an object. S -> C, S.
+    Answer history information (serial, size) for an object. S -> C.
     """
     _header_format = '!8sL'
     _list_entry_format = '!8sL'
@@ -1144,18 +1165,40 @@ class AnswerObjectHistory(Packet):
             history_list.append((serial, size))
         return (oid, history_list)
 
+class AskObjectHistoryFrom(Packet):
+    """
+    Ask history information for a given object. The order of serials is
+    ascending, and starts at (or above) min_serial. S -> S.
+    """
+    _header_format = '!8s8sL'
+
+    def _encode(self, oid, min_serial, length):
+        return pack(self._header_format, oid, min_serial, length)
+
+    def _decode(self, body):
+        return unpack(self._header_format, body) # oid, min_serial, length
+
+class AnswerObjectHistoryFrom(AskFinishTransaction):
+    """
+    Answer the requested serials. S -> S.
+    """
+    # This is similar to AskFinishTransaction as TID size is identical to OID
+    # size:
+    # - we have a single OID (TID in AskFinishTransaction)
+    # - we have a list of TIDs (OIDs in AskFinishTransaction)
+    pass
+
 class AskOIDs(Packet):
     """
-    Ask for OIDs between a range of offsets. The order of OIDs is descending,
-    and the range is [first, last). S -> S.
+    Ask for length OIDs starting at min_oid. S -> S.
     """
-    _header_format = '!QQL'
+    _header_format = '!8sLL'
 
-    def _encode(self, first, last, partition):
-        return pack(self._header_format, first, last, partition)
+    def _encode(self, min_oid, length, partition):
+        return pack(self._header_format, min_oid, length, partition)
 
     def _decode(self, body):
-        return unpack(self._header_format, body) # first, last, partition
+        return unpack(self._header_format, body) # min_oid, length, partition
 
 class AnswerOIDs(Packet):
     """
@@ -1787,6 +1830,14 @@ class PacketRegistry(dict):
             0x0034,
             AskHasLock,
             AnswerHasLock)
+    AskTIDsFrom, AnswerTIDsFrom = register(
+            0x0035,
+            AskTIDsFrom,
+            AnswerTIDsFrom)
+    AskObjectHistoryFrom, AnswerObjectHistoryFrom = register(
+            0x0036,
+            AskObjectHistoryFrom,
+            AnswerObjectHistoryFrom)
 
 # build a "singleton"
 Packets = PacketRegistry()

Modified: trunk/neo/storage/database/manager.py
==============================================================================
--- trunk/neo/storage/database/manager.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/manager.py [iso-8859-1] Tue Aug 24 18:02:15 2010
@@ -263,8 +263,8 @@ class DatabaseManager(object):
         area as well."""
         raise NotImplementedError
 
-    def getOIDList(self, offset, length, num_partitions, partition_list):
-        """Return a list of OIDs in descending order from an offset,
+    def getOIDList(self, min_oid, length, num_partitions, partition_list):
+        """Return a list of OIDs in ascending order from a minimal oid,
         at most the specified length. The list of partitions are passed
         to filter out non-applicable TIDs."""
         raise NotImplementedError
@@ -276,15 +276,20 @@ class DatabaseManager(object):
         If there is no such object ID in a database, return None."""
         raise NotImplementedError
 
+    def getObjectHistoryFrom(self, oid, min_serial, length):
+        """Return a list of length serials for a given object ID at (or above)
+        min_serial, sorted in ascending order."""
+        raise NotImplementedError
+
     def getTIDList(self, offset, length, num_partitions, partition_list):
         """Return a list of TIDs in ascending order from an offset,
         at most the specified length. The list of partitions are passed
         to filter out non-applicable TIDs."""
         raise NotImplementedError
 
-    def getReplicationTIDList(self, offset, length, num_partitions,
+    def getReplicationTIDList(self, min_tid, length, num_partitions,
         partition_list):
-        """Return a list of TIDs in descending order from an offset,
+        """Return a list of TIDs in ascending order from an initial tid value,
         at most the specified length. The list of partitions are passed
         to filter out non-applicable TIDs."""
         raise NotImplementedError

Modified: trunk/neo/storage/database/mysqldb.py
==============================================================================
--- trunk/neo/storage/database/mysqldb.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/mysqldb.py [iso-8859-1] Tue Aug 24 18:02:15 2010
@@ -618,12 +618,18 @@ class MySQLDatabaseManager(DatabaseManag
             return oid_list, user, desc, ext, bool(packed)
         return None
 
-    def getOIDList(self, offset, length, num_partitions, partition_list):
+    def getOIDList(self, min_oid, length, num_partitions,
+            partition_list):
         q = self.query
-        r = q("""SELECT DISTINCT oid FROM obj WHERE MOD(oid, %d) in (%s)
-                    ORDER BY oid DESC LIMIT %d,%d""" \
-                % (num_partitions, ','.join([str(p) for p in partition_list]),
-                   offset, length))
+        r = q("""SELECT DISTINCT oid FROM obj WHERE
+                    MOD(oid, %(num_partitions)d) in (%(partitions)s)
+                    AND oid >= %(min_oid)d
+                    ORDER BY oid ASC LIMIT %(length)d""" % {
+            'num_partitions': num_partitions,
+            'partitions': ','.join([str(p) for p in partition_list]),
+            'min_oid': util.u64(min_oid),
+            'length': length,
+        })
         return [util.p64(t[0]) for t in r]
 
     def _getObjectLength(self, oid, value_serial):
@@ -662,6 +668,19 @@ class MySQLDatabaseManager(DatabaseManag
             return result
         return None
 
+    def getObjectHistoryFrom(self, oid, min_serial, length):
+        q = self.query
+        oid = util.u64(oid)
+        p64 = util.p64
+        r = q("""SELECT serial FROM obj
+                    WHERE oid = %(oid)d AND serial >= %(min_serial)d
+                    ORDER BY serial ASC LIMIT %(length)d""" % {
+            'oid': oid,
+            'min_serial': util.u64(min_serial),
+            'length': length,
+        })
+        return [p64(t[0]) for t in r]
+
     def getTIDList(self, offset, length, num_partitions, partition_list):
         q = self.query
         r = q("""SELECT tid FROM trans WHERE MOD(tid, %d) in (%s)
@@ -671,13 +690,18 @@ class MySQLDatabaseManager(DatabaseManag
                    offset, length))
         return [util.p64(t[0]) for t in r]
 
-    def getReplicationTIDList(self, offset, length, num_partitions, partition_list):
+    def getReplicationTIDList(self, min_tid, length, num_partitions,
+            partition_list):
         q = self.query
-        r = q("""SELECT tid FROM trans WHERE MOD(tid, %d) in (%s)
-                    ORDER BY tid ASC LIMIT %d,%d""" \
-                % (num_partitions,
-                   ','.join([str(p) for p in partition_list]),
-                   offset, length))
+        r = q("""SELECT tid FROM trans WHERE
+                    MOD(tid, %(num_partitions)d) in (%(partitions)s)
+                    AND tid >= %(min_tid)d
+                    ORDER BY tid ASC LIMIT %(length)d""" % {
+            'num_partitions': num_partitions,
+            'partitions': ','.join([str(p) for p in partition_list]),
+            'min_tid': util.u64(min_tid),
+            'length': length,
+        })
         return [util.p64(t[0]) for t in r]
 
     def getTIDListPresent(self, tid_list):

Modified: trunk/neo/storage/handlers/__init__.py
==============================================================================
--- trunk/neo/storage/handlers/__init__.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/__init__.py [iso-8859-1] Tue Aug 24 18:02:15 2010
@@ -65,16 +65,6 @@ class BaseMasterHandler(EventHandler):
 class BaseClientAndStorageOperationHandler(EventHandler):
     """ Accept requests common to client and storage nodes """
 
-    def askObjectHistory(self, conn, oid, first, last):
-        if first >= last:
-            raise protocol.ProtocolError( 'invalid offsets')
-
-        app = self.app
-        history_list = app.dm.getObjectHistory(oid, first, last - first)
-        if history_list is None:
-            history_list = []
-        conn.answer(Packets.AnswerObjectHistory(oid, history_list))
-
     def askTransactionInformation(self, conn, tid):
         app = self.app
         t = app.dm.getTransaction(tid)

Modified: trunk/neo/storage/handlers/client.py
==============================================================================
--- trunk/neo/storage/handlers/client.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/client.py [iso-8859-1] Tue Aug 24 18:02:15 2010
@@ -144,3 +144,13 @@ class ClientOperationHandler(BaseClientA
             state = LockState.GRANTED_TO_OTHER
         conn.answer(Packets.AnswerHasLock(oid, state))
 
+    def askObjectHistory(self, conn, oid, first, last):
+        if first >= last:
+            raise protocol.ProtocolError( 'invalid offsets')
+
+        app = self.app
+        history_list = app.dm.getObjectHistory(oid, first, last - first)
+        if history_list is None:
+            history_list = []
+        conn.answer(Packets.AnswerObjectHistory(oid, history_list))
+

Modified: trunk/neo/storage/handlers/replication.py
==============================================================================
--- trunk/neo/storage/handlers/replication.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/replication.py [iso-8859-1] Tue Aug 24 18:02:15 2010
@@ -19,7 +19,8 @@
 from neo import logging
 
 from neo.handler import EventHandler
-from neo.protocol import Packets
+from neo.protocol import Packets, ZERO_TID, ZERO_OID
+from neo import util
 
 def checkConnectionIsReplicatorConnection(func):
     def decorator(self, conn, *args, **kw):
@@ -31,6 +32,10 @@ def checkConnectionIsReplicatorConnectio
         return result
     return decorator
 
+def add64(packed, offset):
+    """Add a python number to a 64-bits packed value"""
+    return util.p64(util.u64(packed) + offset)
+
 class ReplicationHandler(EventHandler):
     """This class handles events for replications."""
 
@@ -48,7 +53,7 @@ class ReplicationHandler(EventHandler):
         conn.setUUID(uuid)
 
     @checkConnectionIsReplicatorConnection
-    def answerTIDs(self, conn, tid_list):
+    def answerTIDsFrom(self, conn, tid_list):
         app = self.app
         if tid_list:
             # If I have pending TIDs, check which TIDs I don't have, and
@@ -59,18 +64,15 @@ class ReplicationHandler(EventHandler):
                 conn.ask(Packets.AskTransactionInformation(tid), timeout=300)
 
             # And, ask more TIDs.
-            app.replicator.tid_offset += 1000
-            offset = app.replicator.tid_offset
-            p = Packets.AskTIDs(offset, offset + 1000,
+            p = Packets.AskTIDsFrom(add64(tid_list[-1], 1), 1000,
                       app.replicator.current_partition.getRID())
             conn.ask(p, timeout=300)
         else:
             # If no more TID, a replication of transactions is finished.
             # So start to replicate objects now.
-            p = Packets.AskOIDs(0, 1000,
+            p = Packets.AskOIDs(ZERO_OID, 1000,
                       app.replicator.current_partition.getRID())
             conn.ask(p, timeout=300)
-            app.replicator.oid_offset = 0
 
     @checkConnectionIsReplicatorConnection
     def answerTransactionInformation(self, conn, tid,
@@ -84,10 +86,11 @@ class ReplicationHandler(EventHandler):
     def answerOIDs(self, conn, oid_list):
         app = self.app
         if oid_list:
+            app.replicator.next_oid = add64(oid_list[-1], 1)
             # Pick one up, and ask the history.
             oid = oid_list.pop()
-            conn.ask(Packets.AskObjectHistory(oid, 0, 1000), timeout=300)
-            app.replicator.serial_offset = 0
+            conn.ask(Packets.AskObjectHistoryFrom(oid, ZERO_TID, 1000),
+                timeout=300)
             app.replicator.oid_list = oid_list
         else:
             # Nothing remains, so the replication for this partition is
@@ -95,34 +98,29 @@ class ReplicationHandler(EventHandler):
             app.replicator.replication_done = True
 
     @checkConnectionIsReplicatorConnection
-    def answerObjectHistory(self, conn, oid, history_list):
+    def answerObjectHistoryFrom(self, conn, oid, serial_list):
         app = self.app
-        if history_list:
+        if serial_list:
             # Check if I have objects, request those which I don't have.
-            serial_list = [t[0] for t in history_list]
             present_serial_list = app.dm.getSerialListPresent(oid, serial_list)
             serial_set = set(serial_list) - set(present_serial_list)
             for serial in serial_set:
                 conn.ask(Packets.AskObject(oid, serial, None), timeout=300)
 
             # And, ask more serials.
-            app.replicator.serial_offset += 1000
-            offset = app.replicator.serial_offset
-            p = Packets.AskObjectHistory(oid, offset, offset + 1000)
-            conn.ask(p, timeout=300)
+            conn.ask(Packets.AskObjectHistoryFrom(oid,
+                add64(serial_list[-1], 1), 1000), timeout=300)
         else:
             # This OID is finished. So advance to next.
             oid_list = app.replicator.oid_list
             if oid_list:
                 # If I have more pending OIDs, pick one up.
                 oid = oid_list.pop()
-                conn.ask(Packets.AskObjectHistory(oid, 0, 1000), timeout=300)
-                app.replicator.serial_offset = 0
+                conn.ask(Packets.AskObjectHistoryFrom(oid, ZERO_TID, 1000),
+                    timeout=300)
             else:
                 # Otherwise, acquire more OIDs.
-                app.replicator.oid_offset += 1000
-                offset = app.replicator.oid_offset
-                p = Packets.AskOIDs(offset, offset + 1000,
+                p = Packets.AskOIDs(app.replicator.next_oid, 1000,
                           app.replicator.current_partition.getRID())
                 conn.ask(p, timeout=300)
 

Modified: trunk/neo/storage/handlers/storage.py
==============================================================================
--- trunk/neo/storage/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/storage.py [iso-8859-1] Tue Aug 24 18:02:15 2010
@@ -30,36 +30,34 @@ class StorageOperationHandler(BaseClient
         tid = app.dm.getLastTID()
         conn.answer(Packets.AnswerLastIDs(oid, tid, app.pt.getID()))
 
-    def askOIDs(self, conn, first, last, partition):
+    def askOIDs(self, conn, min_oid, length, partition):
         # This method is complicated, because I must return OIDs only
         # about usable partitions assigned to me.
-        if first >= last:
-            raise protocol.ProtocolError('invalid offsets')
-
         app = self.app
-
         if partition == protocol.INVALID_PARTITION:
             partition_list = app.pt.getAssignedPartitionList(app.uuid)
         else:
             partition_list = [partition]
 
-        oid_list = app.dm.getOIDList(first, last - first,
+        oid_list = app.dm.getOIDList(min_oid, length,
                                      app.pt.getPartitions(), partition_list)
         conn.answer(Packets.AnswerOIDs(oid_list))
 
-    def askTIDs(self, conn, first, last, partition):
+    def askTIDsFrom(self, conn, min_tid, length, partition):
         # This method is complicated, because I must return TIDs only
         # about usable partitions assigned to me.
-        if first >= last:
-            raise protocol.ProtocolError('invalid offsets')
-
         app = self.app
         if partition == protocol.INVALID_PARTITION:
             partition_list = app.pt.getAssignedPartitionList(app.uuid)
         else:
             partition_list = [partition]
 
-        tid_list = app.dm.getReplicationTIDList(first, last - first,
+        tid_list = app.dm.getReplicationTIDList(min_tid, length,
                              app.pt.getPartitions(), partition_list)
-        conn.answer(Packets.AnswerTIDs(tid_list))
+        conn.answer(Packets.AnswerTIDsFrom(tid_list))
+
+    def askObjectHistoryFrom(self, conn, oid, min_serial, length):
+        app = self.app
+        history_list = app.dm.getObjectHistoryFrom(oid, min_serial, length)
+        conn.answer(Packets.AnswerObjectHistoryFrom(oid, history_list))
 

Modified: trunk/neo/storage/replicator.py
==============================================================================
--- trunk/neo/storage/replicator.py [iso-8859-1] (original)
+++ trunk/neo/storage/replicator.py [iso-8859-1] Tue Aug 24 18:02:15 2010
@@ -19,7 +19,7 @@ from neo import logging
 from random import choice
 
 from neo.storage.handlers import replication
-from neo.protocol import NodeTypes, NodeStates, CellStates, Packets
+from neo.protocol import NodeTypes, NodeStates, CellStates, Packets, ZERO_TID
 from neo.connection import ClientConnection
 from neo.util import dump
 
@@ -38,7 +38,7 @@ class Partition(object):
 
     def setCriticalTID(self, tid):
         if tid is None:
-            tid = '\x00' * 8
+            tid = ZERO_TID
         self.tid = tid
 
     def safe(self, min_pending_tid):
@@ -81,7 +81,6 @@ class Replicator(object):
         self.app = app
         self.new_partition_dict = self._getOutdatedPartitionList()
         self.critical_tid_dict = {}
-        self.tid_offset = 0
         self.reset()
 
     def reset(self):
@@ -172,8 +171,8 @@ class Replicator(object):
                     app.uuid, app.server, app.name)
             self.current_connection.ask(p)
 
-        self.tid_offset = 0
-        p = Packets.AskTIDs(0, 1000, self.current_partition.getRID())
+        p = Packets.AskTIDsFrom(ZERO_TID, 1000,
+            self.current_partition.getRID())
         self.current_connection.ask(p, timeout=300)
 
         self.replication_done = False

Modified: trunk/neo/tests/__init__.py
==============================================================================
--- trunk/neo/tests/__init__.py [iso-8859-1] (original)
+++ trunk/neo/tests/__init__.py [iso-8859-1] Tue Aug 24 18:02:15 2010
@@ -364,9 +364,15 @@ class NeoTestBase(unittest.TestCase):
     def checkAnswerTids(self, conn, **kw):
         return self.checkAnswerPacket(conn, Packets.AnswerTIDs, **kw)
 
+    def checkAnswerTidsFrom(self, conn, **kw):
+        return self.checkAnswerPacket(conn, Packets.AnswerTIDsFrom, **kw)
+
     def checkAnswerObjectHistory(self, conn, **kw):
         return self.checkAnswerPacket(conn, Packets.AnswerObjectHistory, **kw)
 
+    def checkAnswerObjectHistoryFrom(self, conn, **kw):
+        return self.checkAnswerPacket(conn, Packets.AnswerObjectHistoryFrom, **kw)
+
     def checkAnswerStoreTransaction(self, conn, **kw):
         return self.checkAnswerPacket(conn, Packets.AnswerStoreTransaction, **kw)
 

Modified: trunk/neo/tests/storage/testStorageHandler.py
==============================================================================
--- trunk/neo/tests/storage/testStorageHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testStorageHandler.py [iso-8859-1] Tue Aug 24 18:02:15 2010
@@ -113,28 +113,19 @@ class StorageStorageHandlerTests(NeoTest
         self.assertEquals(len(self.app.event_queue), 0)
         self.checkAnswerObject(conn)
 
-    def test_25_askTIDs1(self):
-        # invalid offsets => error
-        app = self.app
-        app.pt = Mock()
-        app.dm = Mock()
-        conn = self.getFakeConnection()
-        self.checkProtocolErrorRaised(self.operation.askTIDs, conn, 1, 1, None)
-        self.assertEquals(len(app.pt.mockGetNamedCalls('getCellList')), 0)
-        self.assertEquals(len(app.dm.mockGetNamedCalls('getReplicationTIDList')), 0)
-
-    def test_25_askTIDs2(self):
+    def test_25_askTIDsFrom1(self):
         # well case => answer
         conn = self.getFakeConnection()
         self.app.dm = Mock({'getReplicationTIDList': (INVALID_TID, )})
         self.app.pt = Mock({'getPartitions': 1})
-        self.operation.askTIDs(conn, 1, 2, 1)
+        tid = self.getNextTID()
+        self.operation.askTIDsFrom(conn, tid, 2, 1)
         calls = self.app.dm.mockGetNamedCalls('getReplicationTIDList')
         self.assertEquals(len(calls), 1)
-        calls[0].checkArgs(1, 1, 1, [1, ])
-        self.checkAnswerTids(conn)
+        calls[0].checkArgs(tid, 2, 1, [1, ])
+        self.checkAnswerTidsFrom(conn)
 
-    def test_25_askTIDs3(self):
+    def test_25_askTIDsFrom2(self):
         # invalid partition => answer usable partitions
         conn = self.getFakeConnection()
         cell = Mock({'getUUID':self.app.uuid})
@@ -144,59 +135,39 @@ class StorageStorageHandlerTests(NeoTest
             'getPartitions': 1,
             'getAssignedPartitionList': [0],
         })
-        self.operation.askTIDs(conn, 1, 2, INVALID_PARTITION)
+        tid = self.getNextTID()
+        self.operation.askTIDsFrom(conn, tid, 2, INVALID_PARTITION)
         self.assertEquals(len(self.app.pt.mockGetNamedCalls('getAssignedPartitionList')), 1)
         calls = self.app.dm.mockGetNamedCalls('getReplicationTIDList')
         self.assertEquals(len(calls), 1)
-        calls[0].checkArgs(1, 1, 1, [0, ])
-        self.checkAnswerTids(conn)
+        calls[0].checkArgs(tid, 2, 1, [0, ])
+        self.checkAnswerTidsFrom(conn)
 
-    def test_26_askObjectHistory1(self):
-        # invalid offsets => error
-        app = self.app
-        app.dm = Mock()
-        conn = self.getFakeConnection()
-        self.checkProtocolErrorRaised(self.operation.askObjectHistory, conn, 
-            1, 1, None)
-        self.assertEquals(len(app.dm.mockGetNamedCalls('getObjectHistory')), 0)
-
-    def test_26_askObjectHistory2(self):
-        oid1 = self.getOID(1)
-        oid2 = self.getOID(2)
+    def test_26_askObjectHistoryFrom(self):
+        oid = self.getOID(2)
+        min_tid = self.getNextTID()
         tid = self.getNextTID()
-        # first case: empty history
-        conn = self.getFakeConnection()
-        self.app.dm = Mock({'getObjectHistory': None})
-        self.operation.askObjectHistory(conn, oid1, 1, 2)
-        self.checkAnswerObjectHistory(conn)
-        # second case: not empty history
         conn = self.getFakeConnection()
-        self.app.dm = Mock({'getObjectHistory': [(tid, 0, ), ]})
-        self.operation.askObjectHistory(conn, oid2, 1, 2)
-        self.checkAnswerObjectHistory(conn)
+        self.app.dm = Mock({'getObjectHistoryFrom': [tid]})
+        self.operation.askObjectHistoryFrom(conn, oid, min_tid, 2)
+        self.checkAnswerObjectHistoryFrom(conn)
+        calls = self.app.dm.mockGetNamedCalls('getObjectHistoryFrom')
+        self.assertEquals(len(calls), 1)
+        calls[0].checkArgs(oid, min_tid, 2)
 
     def test_25_askOIDs1(self):
-        # invalid offsets => error
-        app = self.app
-        app.pt = Mock()
-        app.dm = Mock()
-        conn = self.getFakeConnection()
-        self.checkProtocolErrorRaised(self.operation.askOIDs, conn, 1, 1, None)
-        self.assertEquals(len(app.pt.mockGetNamedCalls('getCellList')), 0)
-        self.assertEquals(len(app.dm.mockGetNamedCalls('getOIDList')), 0)
-
-    def test_25_askOIDs2(self):
         # well case > answer OIDs
         conn = self.getFakeConnection()
         self.app.pt = Mock({'getPartitions': 1})
         self.app.dm = Mock({'getOIDList': (INVALID_OID, )})
-        self.operation.askOIDs(conn, 1, 2, 1)
+        oid = self.getOID(1)
+        self.operation.askOIDs(conn, oid, 2, 1)
         calls = self.app.dm.mockGetNamedCalls('getOIDList')
         self.assertEquals(len(calls), 1)
-        calls[0].checkArgs(1, 1, 1, [1, ])
+        calls[0].checkArgs(oid, 2, 1, [1, ])
         self.checkAnswerOids(conn)
 
-    def test_25_askOIDs3(self):
+    def test_25_askOIDs2(self):
         # invalid partition => answer usable partitions
         conn = self.getFakeConnection()
         cell = Mock({'getUUID':self.app.uuid})
@@ -206,11 +177,12 @@ class StorageStorageHandlerTests(NeoTest
             'getPartitions': 1,
             'getAssignedPartitionList': [0],
         })
-        self.operation.askOIDs(conn, 1, 2, INVALID_PARTITION)
+        oid = self.getOID(1)
+        self.operation.askOIDs(conn, oid, 2, INVALID_PARTITION)
         self.assertEquals(len(self.app.pt.mockGetNamedCalls('getAssignedPartitionList')), 1)
         calls = self.app.dm.mockGetNamedCalls('getOIDList')
         self.assertEquals(len(calls), 1)
-        calls[0].checkArgs(1, 1, 1, [0])
+        calls[0].checkArgs(oid, 2, 1, [0])
         self.checkAnswerOids(conn)
 
 

Modified: trunk/neo/tests/storage/testStorageMySQLdb.py
==============================================================================
--- trunk/neo/tests/storage/testStorageMySQLdb.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testStorageMySQLdb.py [iso-8859-1] Tue Aug 24 18:02:15 2010
@@ -457,20 +457,20 @@ class StorageMySQSLdbTests(NeoTestBase):
         self.db.storeTransaction(tid, objs, txn)
         self.db.finishTransaction(tid)
         # get oids
-        result = self.db.getOIDList(0, 4, 1, [0])
+        result = self.db.getOIDList(oid1, 4, 1, [0])
         self.checkSet(result, [oid1, oid2, oid3, oid4])
-        result = self.db.getOIDList(0, 4, 2, [0])
+        result = self.db.getOIDList(oid1, 4, 2, [0])
         self.checkSet(result, [oid1, oid3])
-        result = self.db.getOIDList(0, 4, 2, [0, 1])
+        result = self.db.getOIDList(oid1, 4, 2, [0, 1])
         self.checkSet(result, [oid1, oid2, oid3, oid4])
-        result = self.db.getOIDList(0, 4, 3, [0])
+        result = self.db.getOIDList(oid1, 4, 3, [0])
         self.checkSet(result, [oid1, oid4])
         # get a subset of oids
-        result = self.db.getOIDList(2, 4, 1, [0])
+        result = self.db.getOIDList(oid1, 2, 1, [0])
         self.checkSet(result, [oid1, oid2])
-        result = self.db.getOIDList(0, 2, 1, [0])
+        result = self.db.getOIDList(oid3, 2, 1, [0])
         self.checkSet(result, [oid3, oid4])
-        result = self.db.getOIDList(0, 1, 3, [0])
+        result = self.db.getOIDList(oid2, 1, 3, [0])
         self.checkSet(result, [oid4])
 
     def test_getObjectHistory(self):
@@ -496,23 +496,18 @@ class StorageMySQSLdbTests(NeoTestBase):
         result = self.db.getObjectHistory(oid, 2, 3)
         self.assertEqual(result, None)
 
-    def test_getTIDList(self):
+    def _storeTransactions(self, count):
         # use OID generator to know result of tid % N
-        tid1, tid2, tid3, tid4 = self.getOIDs(4)
+        tid_list = self.getOIDs(count)
         oid = self.getOID(1)
-        txn1, objs1 = self.getTransaction([oid])
-        txn2, objs2 = self.getTransaction([oid])
-        txn3, objs3 = self.getTransaction([oid])
-        txn4, objs4 = self.getTransaction([oid])
-        # store four transaction
-        self.db.storeTransaction(tid1, objs1, txn1)
-        self.db.storeTransaction(tid2, objs2, txn2)
-        self.db.storeTransaction(tid3, objs3, txn3)
-        self.db.storeTransaction(tid4, objs4, txn4)
-        self.db.finishTransaction(tid1)
-        self.db.finishTransaction(tid2)
-        self.db.finishTransaction(tid3)
-        self.db.finishTransaction(tid4)
+        for tid in tid_list:
+            txn, objs = self.getTransaction([oid])
+            self.db.storeTransaction(tid, objs, txn)
+            self.db.finishTransaction(tid)
+        return tid_list
+
+    def test_getTIDList(self):
+        tid1, tid2, tid3, tid4 = self._storeTransactions(4)
         # get tids
         result = self.db.getTIDList(0, 4, 1, [0])
         self.checkSet(result, [tid1, tid2, tid3, tid4])
@@ -530,6 +525,25 @@ class StorageMySQSLdbTests(NeoTestBase):
         result = self.db.getTIDList(0, 1, 3, [0])
         self.checkSet(result, [tid4])
 
+    def test_getReplicationTIDList(self):
+        tid1, tid2, tid3, tid4 = self._storeTransactions(4)
+        # get tids
+        result = self.db.getReplicationTIDList(tid1, 4, 1, [0])
+        self.checkSet(result, [tid1, tid2, tid3, tid4])
+        result = self.db.getReplicationTIDList(tid1, 4, 2, [0])
+        self.checkSet(result, [tid1, tid3])
+        result = self.db.getReplicationTIDList(tid1, 4, 2, [0, 1])
+        self.checkSet(result, [tid1, tid2, tid3, tid4])
+        result = self.db.getReplicationTIDList(tid1, 4, 3, [0])
+        self.checkSet(result, [tid1, tid4])
+        # get a subset of tids
+        result = self.db.getReplicationTIDList(tid3, 4, 1, [0])
+        self.checkSet(result, [tid3, tid4])
+        result = self.db.getReplicationTIDList(tid1, 2, 1, [0])
+        self.checkSet(result, [tid1, tid2])
+        result = self.db.getReplicationTIDList(tid1, 1, 3, [1])
+        self.checkSet(result, [tid2])
+
     def test_getTIDListPresent(self):
         oid = self.getOID(1)
         tid1, tid2, tid3, tid4 = self.getTIDs(4)

Modified: trunk/neo/tests/testProtocol.py
==============================================================================
--- trunk/neo/tests/testProtocol.py [iso-8859-1] (original)
+++ trunk/neo/tests/testProtocol.py [iso-8859-1] Tue Aug 24 18:02:15 2010
@@ -269,13 +269,16 @@ class ProtocolTests(NeoTestBase):
         self.assertEqual(p_oid_list, oid_list)
 
     def test_36_askFinishTransaction(self):
+        self._testXIDAndYIDList(Packets.AskFinishTransaction)
+
+    def _testXIDAndYIDList(self, packet):
         oid1 = self.getNextTID()
         oid2 = self.getNextTID()
         oid3 = self.getNextTID()
         oid4 = self.getNextTID()
         tid = self.getNextTID()
         oid_list = [oid1, oid2, oid3, oid4]
-        p = Packets.AskFinishTransaction(tid, oid_list)
+        p = packet(tid, oid_list)
         p_tid, p_oid_list  = p.decode()
         self.assertEqual(p_tid, tid)
         self.assertEqual(p_oid_list, oid_list)
@@ -404,12 +407,15 @@ class ProtocolTests(NeoTestBase):
         self.assertEqual(partition, 5)
 
     def test_50_answerTIDs(self):
+        self._test_AnswerTIDs(Packets.AnswerTIDs)
+
+    def _test_AnswerTIDs(self, packet):
         tid1 = self.getNextTID()
         tid2 = self.getNextTID()
         tid3 = self.getNextTID()
         tid4 = self.getNextTID()
         tid_list = [tid1, tid2, tid3, tid4]
-        p = Packets.AnswerTIDs(tid_list)
+        p = packet(tid_list)
         p_tid_list  = p.decode()[0]
         self.assertEqual(p_tid_list, tid_list)
 
@@ -457,10 +463,11 @@ class ProtocolTests(NeoTestBase):
         self.assertEqual(oid, poid)
 
     def test_55_askOIDs(self):
-        p = Packets.AskOIDs(1, 10, 5)
-        first, last, partition = p.decode()
-        self.assertEqual(first, 1)
-        self.assertEqual(last, 10)
+        oid = self.getOID(1)
+        p = Packets.AskOIDs(oid, 1000, 5)
+        min_oid, length, partition = p.decode()
+        self.assertEqual(min_oid, oid)
+        self.assertEqual(length, 1000)
         self.assertEqual(partition, 5)
 
     def test_56_answerOIDs(self):
@@ -602,6 +609,30 @@ class ProtocolTests(NeoTestBase):
         msg = 'test'
         self.assertEqual(Packets.Notify(msg).decode(), (msg, ))
 
+    def test_AskTIDsFrom(self):
+        tid = self.getNextTID()
+        p = Packets.AskTIDsFrom(tid, 1000, 5)
+        min_tid, length, partition = p.decode()
+        self.assertEqual(min_tid, tid)
+        self.assertEqual(length, 1000)
+        self.assertEqual(partition, 5)
+
+    def test_AnswerTIDsFrom(self):
+        self._test_AnswerTIDs(Packets.AnswerTIDsFrom)
+
+    def test_AskObjectHistoryFrom(self):
+        oid = self.getOID(1)
+        min_serial = self.getNextTID()
+        length = 5
+        p = Packets.AskObjectHistoryFrom(oid, min_serial, length)
+        p_oid, p_min_serial, p_length = p.decode()
+        self.assertEqual(p_oid, oid)
+        self.assertEqual(p_min_serial, min_serial)
+        self.assertEqual(p_length, length)
+
+    def test_AnswerObjectHistoryFrom(self):
+        self._testXIDAndYIDList(Packets.AnswerObjectHistoryFrom)
+
 if __name__ == '__main__':
     unittest.main()
 





More information about the Neo-report mailing list