[Neo-report] r2649 gregory - in /trunk/neo: lib/ master/ master/handlers/ storage/ storage...

nobody at svn.erp5.org nobody at svn.erp5.org
Tue Feb 8 15:07:08 CET 2011


Author: gregory
Date: Tue Feb  8 15:07:07 2011
New Revision: 2649

Log:
Make replication work with temporary TIDs.

- Storage nodes start to replicate a partition when all transactions that were
pending when the oudated partition was added are committed.
- Transactions are registered by the master from the tpc_begin step.

Signed-off-by: Grégory <gregory at nexedi.com>

Modified:
    trunk/neo/lib/handler.py
    trunk/neo/lib/protocol.py
    trunk/neo/master/app.py
    trunk/neo/master/handlers/client.py
    trunk/neo/master/handlers/storage.py
    trunk/neo/master/transactions.py
    trunk/neo/master/verification.py
    trunk/neo/storage/handlers/master.py
    trunk/neo/storage/handlers/verification.py
    trunk/neo/storage/replicator.py
    trunk/neo/tests/__init__.py
    trunk/neo/tests/functional/testStorage.py
    trunk/neo/tests/master/testClientHandler.py
    trunk/neo/tests/master/testStorageHandler.py
    trunk/neo/tests/master/testTransactions.py
    trunk/neo/tests/master/testVerification.py
    trunk/neo/tests/storage/testMasterHandler.py
    trunk/neo/tests/storage/testReplicator.py
    trunk/neo/tests/storage/testVerificationHandler.py
    trunk/neo/tests/testProtocol.py

Modified: trunk/neo/lib/handler.py
==============================================================================
--- trunk/neo/lib/handler.py [iso-8859-1] (original)
+++ trunk/neo/lib/handler.py [iso-8859-1] Tue Feb  8 15:07:07 2011
@@ -184,7 +184,7 @@ class EventHandler(object):
     def askUnfinishedTransactions(self, conn):
         raise UnexpectedPacketError
 
-    def answerUnfinishedTransactions(self, conn, tid_list):
+    def answerUnfinishedTransactions(self, conn, max_tid, ttid_list):
         raise UnexpectedPacketError
 
     def askObjectPresent(self, conn, oid, tid):
@@ -229,6 +229,9 @@ class EventHandler(object):
     def notifyUnlockInformation(self, conn, ttid):
         raise UnexpectedPacketError
 
+    def notifyTransactionFinished(self, conn, ttid, max_tid):
+        raise UnexpectedPacketError
+
     def askStoreObject(self, conn, oid, serial,
             compression, checksum, data, data_serial, ttid, unlock):
         raise UnexpectedPacketError
@@ -506,6 +509,7 @@ class EventHandler(object):
         d[Packets.AnswerLastTransaction] = self.answerLastTransaction
         d[Packets.AskCheckCurrentSerial] = self.askCheckCurrentSerial
         d[Packets.AnswerCheckCurrentSerial] = self.answerCheckCurrentSerial
+        d[Packets.NotifyTransactionFinished] = self.notifyTransactionFinished
 
         return d
 

Modified: trunk/neo/lib/protocol.py
==============================================================================
--- trunk/neo/lib/protocol.py [iso-8859-1] (original)
+++ trunk/neo/lib/protocol.py [iso-8859-1] Tue Feb  8 15:07:07 2011
@@ -661,18 +661,18 @@ class AnswerUnfinishedTransactions(Packe
     """
     Answer unfinished transactions  S -> PM.
     """
-    _header_format = '!L'
+    _header_format = '!8sL'
     _list_entry_format = '8s'
     _list_entry_len = calcsize(_list_entry_format)
 
-    def _encode(self, tid_list):
-        body = [pack(self._header_format, len(tid_list))]
+    def _encode(self, max_tid, tid_list):
+        body = [pack(self._header_format, max_tid, len(tid_list))]
         body.extend(tid_list)
         return ''.join(body)
 
     def _decode(self, body):
         offset = self._header_len
-        (n,) = unpack(self._header_format, body[:offset])
+        (max_tid, n) = unpack(self._header_format, body[:offset])
         tid_list = []
         list_entry_format = self._list_entry_format
         list_entry_len = self._list_entry_len
@@ -681,7 +681,7 @@ class AnswerUnfinishedTransactions(Packe
             tid = unpack(list_entry_format, body[offset:next_offset])[0]
             offset = next_offset
             tid_list.append(tid)
-        return (tid_list,)
+        return (max_tid, tid_list)
 
 class AskObjectPresent(Packet):
     """
@@ -784,6 +784,18 @@ class AskFinishTransaction(Packet):
             oid_list.append(oid)
         return (tid, oid_list)
 
+class NotifyTransactionFinished(Packet):
+    """
+    Notify that a transaction blocking a replication is now finished
+    M -> S
+    """
+    def _encode(self, ttid, max_tid):
+        return _encodeTID(ttid) + _encodeTID(max_tid)
+
+    def _decode(self, body):
+        (ttid, max_tid) = unpack('8s8s', body)
+        return (ttid, max_tid)
+
 class AnswerTransactionFinished(Packet):
     """
     Answer when a transaction is finished. PM -> C.
@@ -2044,6 +2056,10 @@ class PacketRegistry(dict):
             AskCheckCurrentSerial,
             AnswerCheckCurrentSerial,
             )
+    NotifyTransactionFinished = register(
+            0x003E,
+            NotifyTransactionFinished,
+            )
 
 # build a "singleton"
 Packets = PacketRegistry()

Modified: trunk/neo/master/app.py
==============================================================================
--- trunk/neo/master/app.py [iso-8859-1] (original)
+++ trunk/neo/master/app.py [iso-8859-1] Tue Feb  8 15:07:07 2011
@@ -585,6 +585,13 @@ class Application(object):
         for storage_uuid in txn.getUUIDList():
             getByUUID(storage_uuid).getConnection().notify(notify_unlock)
 
+        # Notify storage that have replications blocked by this transaction
+        notify_finished = Packets.NotifyTransactionFinished(ttid, tid)
+        for storage_uuid in txn.getNotificationUUIDList():
+            node = getByUUID(storage_uuid)
+            if node is not None and node.isConnected():
+                node.getConnection().notify(notify_finished)
+
         # remove transaction from manager
         self.tm.remove(transaction_node.getUUID(), ttid)
         self.setLastTransaction(tid)

Modified: trunk/neo/master/handlers/client.py
==============================================================================
--- trunk/neo/master/handlers/client.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/client.py [iso-8859-1] Tue Feb  8 15:07:07 2011
@@ -51,8 +51,9 @@ class ClientServiceHandler(MasterHandler
         """
             A client request a TID, nothing is kept about it until the finish.
         """
-        conn.answer(Packets.AnswerBeginTransaction(self.app.tm.begin(
-            conn.getUUID(), tid)))
+        app = self.app
+        node = app.nm.getByUUID(conn.getUUID())
+        conn.answer(Packets.AnswerBeginTransaction(app.tm.begin(node, tid)))
 
     def askNewOIDs(self, conn, num_oids):
         app = self.app
@@ -84,9 +85,8 @@ class ClientServiceHandler(MasterHandler
         usable_uuid_set = set((x.getUUID() for x in identified_node_list))
         partitions = app.pt.getPartitions()
         peer_id = conn.getPeerId()
-        node = app.nm.getByUUID(conn.getUUID())
-        tid = app.tm.prepare(node, ttid, partitions, oid_list,
-            usable_uuid_set, peer_id)
+        tid = app.tm.prepare(ttid, partitions, oid_list, usable_uuid_set,
+            peer_id)
 
         # check if greater and foreign OID was stored
         if app.tm.updateLastOID(oid_list):

Modified: trunk/neo/master/handlers/storage.py
==============================================================================
--- trunk/neo/master/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/storage.py [iso-8859-1] Tue Feb  8 15:07:07 2011
@@ -59,7 +59,10 @@ class StorageServiceHandler(BaseServiceH
         conn.answer(Packets.AnswerLastIDs(loid, ltid, app.pt.getID()))
 
     def askUnfinishedTransactions(self, conn):
-        p = Packets.AnswerUnfinishedTransactions(self.app.tm.getPendingList())
+        tm = self.app.tm
+        pending_list = tm.registerForNotification(conn.getUUID())
+        last_tid = tm.getLastTID()
+        p = Packets.AnswerUnfinishedTransactions(last_tid, pending_list)
         conn.answer(p)
 
     def answerInformationLocked(self, conn, ttid):

Modified: trunk/neo/master/transactions.py
==============================================================================
--- trunk/neo/master/transactions.py [iso-8859-1] (original)
+++ trunk/neo/master/transactions.py [iso-8859-1] Tue Feb  8 15:07:07 2011
@@ -91,29 +91,31 @@ class Transaction(object):
     """
         A pending transaction
     """
+    _tid = None
+    _msg_id = None
+    _oid_list = None
+    _prepared = False
+    # uuid dict hold flag to known who has locked the transaction
+    _uuid_set = None
+    _lock_wait_uuid_set = None
 
-    def __init__(self, node, ttid, tid, oid_list, uuid_list, msg_id):
+    def __init__(self, node, ttid):
         """
             Prepare the transaction, set OIDs and UUIDs related to it
         """
         self._node = node
         self._ttid = ttid
-        self._tid = tid
-        self._oid_list = oid_list
-        self._msg_id = msg_id
-        # uuid dict hold flag to known who has locked the transaction
-        self._uuid_set = set(uuid_list)
-        self._lock_wait_uuid_set = set(uuid_list)
         self._birth = time()
-        self._prepared = False
+        # store storage uuids that must be notified at commit
+        self._notification_set = set()
 
     def __repr__(self):
         return "<%s(client=%r, tid=%r, oids=%r, storages=%r, age=%.2fs) at %x>" % (
                 self.__class__.__name__,
                 self._node,
                 dump(self._tid),
-                [dump(x) for x in self._oid_list],
-                [dump(x) for x in self._uuid_set],
+                [dump(x) for x in self._oid_list or ()],
+                [dump(x) for x in self._uuid_set or ()],
                 time() - self._birth,
                 id(self),
         )
@@ -161,6 +163,19 @@ class Transaction(object):
         """
         return self._prepared
 
+    def registerForNotification(self, uuid):
+        """
+            Register a storage node that requires a notification at commit
+        """
+        self._notification_set.add(uuid)
+
+    def getNotificationUUIDList(self):
+        """
+            Returns the list of storage waiting for the transaction to be
+            finished
+        """
+        return list(self._notification_set)
+
     def prepare(self, tid, oid_list, uuid_list, msg_id):
 
         self._tid = tid
@@ -332,31 +347,42 @@ class TransactionManager(object):
         """
         return bool(self._ttid_dict)
 
-    def getPendingList(self):
+    def registerForNotification(self, uuid):
         """
             Return the list of pending transaction IDs
         """
-        return [txn.getTID() for txn in self._ttid_dict.values()]
+        # remember that this node must be notified when pending transactions
+        # will be finished
+        for txn in self._ttid_dict.itervalues():
+            txn.registerForNotification(uuid)
+        return set(self._ttid_dict.keys())
 
-    def begin(self, uuid, tid=None):
+    def begin(self, node, tid=None):
         """
             Generate a new TID
         """
         if tid is None:
             # No TID requested, generate a temporary one
-            tid = self.getTTID()
+            ttid = self.getTTID()
         else:
             # Use of specific TID requested, queue it immediately and update
             # last TID.
-            self._queue.append((uuid, tid))
+            self._queue.append((node.getUUID(), tid))
             self.setLastTID(tid)
-        return tid
+            ttid = tid
+        txn = Transaction(node, ttid)
+        self._ttid_dict[ttid] = txn
+        self._node_dict.setdefault(node, {})[ttid] = txn
+        neo.lib.logging.debug('Begin %s for %s', txn, node)
+        return ttid
 
-    def prepare(self, node, ttid, divisor, oid_list, uuid_list, msg_id):
+    def prepare(self, ttid, divisor, oid_list, uuid_list, msg_id):
         """
             Prepare a transaction to be finished
         """
         # XXX: not efficient but the list should be often small
+        txn = self._ttid_dict[ttid]
+        node = txn.getNode()
         for _, tid in self._queue:
             if ttid == tid:
                 break
@@ -365,9 +391,7 @@ class TransactionManager(object):
             self._queue.append((node.getUUID(), ttid))
         neo.lib.logging.debug('Finish TXN %s for %s (was %s)',
                         dump(tid), node, dump(ttid))
-        txn = Transaction(node, ttid, tid, oid_list, uuid_list, msg_id)
-        self._ttid_dict[ttid] = txn
-        self._node_dict.setdefault(node, {})[ttid] = txn
+        txn.prepare(tid, oid_list, uuid_list, msg_id)
         return tid
 
     def remove(self, uuid, ttid):
@@ -383,7 +407,6 @@ class TransactionManager(object):
         ttid_dict = self._ttid_dict
         if ttid in ttid_dict:
             txn = ttid_dict[ttid]
-            tid = txn.getTID()
             node = txn.getNode()
             # ...and tried to finish
             del ttid_dict[ttid]

Modified: trunk/neo/master/verification.py
==============================================================================
--- trunk/neo/master/verification.py [iso-8859-1] (original)
+++ trunk/neo/master/verification.py [iso-8859-1] Tue Feb  8 15:07:07 2011
@@ -195,7 +195,7 @@ class VerificationManager(BaseServiceHan
         # approved during recovery, there is no need to check them here.
         pass
 
-    def answerUnfinishedTransactions(self, conn, tid_list):
+    def answerUnfinishedTransactions(self, conn, max_tid, tid_list):
         uuid = conn.getUUID()
         neo.lib.logging.info('got unfinished transactions %s from %r',
             [dump(tid) for tid in tid_list], conn)

Modified: trunk/neo/storage/handlers/master.py
==============================================================================
--- trunk/neo/storage/handlers/master.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/master.py [iso-8859-1] Tue Feb  8 15:07:07 2011
@@ -27,8 +27,11 @@ class MasterOperationHandler(BaseMasterH
     def answerLastIDs(self, conn, loid, ltid, lptid):
         self.app.replicator.setCriticalTID(ltid)
 
-    def answerUnfinishedTransactions(self, conn, tid_list):
-        self.app.replicator.setUnfinishedTIDList(tid_list)
+    def answerUnfinishedTransactions(self, conn, max_tid, ttid_list):
+        self.app.replicator.setUnfinishedTIDList(max_tid, ttid_list)
+
+    def notifyTransactionFinished(self, conn, ttid, max_tid):
+        self.app.replicator.transactionFinished(ttid, max_tid)
 
     def notifyPartitionChanges(self, conn, ptid, cell_list):
         """This is very similar to Send Partition Table, except that

Modified: trunk/neo/storage/handlers/verification.py
==============================================================================
--- trunk/neo/storage/handlers/verification.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/verification.py [iso-8859-1] Tue Feb  8 15:07:07 2011
@@ -18,7 +18,7 @@
 import neo
 
 from neo.storage.handlers import BaseMasterHandler
-from neo.lib.protocol import Packets, Errors, ProtocolError
+from neo.lib.protocol import Packets, Errors, ProtocolError, INVALID_TID
 from neo.lib.util import dump
 from neo.lib.exception import OperationFailure
 
@@ -62,7 +62,7 @@ class VerificationHandler(BaseMasterHand
 
     def askUnfinishedTransactions(self, conn):
         tid_list = self.app.dm.getUnfinishedTIDList()
-        conn.answer(Packets.AnswerUnfinishedTransactions(tid_list))
+        conn.answer(Packets.AnswerUnfinishedTransactions(INVALID_TID, tid_list))
 
     def askTransactionInformation(self, conn, tid):
         app = self.app

Modified: trunk/neo/storage/replicator.py
==============================================================================
--- trunk/neo/storage/replicator.py [iso-8859-1] (original)
+++ trunk/neo/storage/replicator.py [iso-8859-1] Tue Feb  8 15:07:07 2011
@@ -26,22 +26,26 @@ from neo.lib.util import dump
 class Partition(object):
     """This class abstracts the state of a partition."""
 
-    def __init__(self, offset, tid):
-        self.offset = offset
-        if tid is None:
-            tid = ZERO_TID
-        self.tid = tid
+    def __init__(self, offset, max_tid, ttid_list):
+        self._offset = offset
+        self._pending_ttid_list = ttid_list
+        # pending upper bound
+        self._critical_tid = max_tid
 
     def getOffset(self):
-        return self.offset
+        return self._offset
 
     def getCriticalTID(self):
-        return self.tid
+        return self._critical_tid
 
-    def safe(self, min_pending_tid):
-        tid = self.tid
-        return tid is not None and (
-            min_pending_tid is None or tid < min_pending_tid)
+    def transactionFinished(self, ttid, max_tid):
+        self._pending_ttid_list.remove(ttid)
+        assert max_tid is not None
+        # final upper bound
+        self._critical_tid = max_tid
+
+    def safe(self):
+        return not self._pending_ttid_list
 
 class Task(object):
     """
@@ -115,23 +119,18 @@ class Replicator(object):
           I ask only non-existing data. """
 
     # new_partition_set
-    #   outdated partitions for which no critical tid was asked to primary
-    #   master yet
-    # critical_tid_list
-    #   outdated partitions for which a critical tid was asked to primary
-    #   master, but not answered so far
+    #   outdated partitions for which no pending transactions was asked to
+    #   primary master yet
     # partition_dict
-    #   outdated partitions (with or without a critical tid - if without, it
-    #   was asked to primary master)
+    #   outdated partitions with pending transaction and temporary critical
+    #   tid
     # current_partition
     #   partition being currently synchronised
     # current_connection
     #   connection to a storage node we are replicating from
     # waiting_for_unfinished_tids
-    #   unfinished_tid_list has been asked to primary master node, but it
+    #   unfinished tids have been asked to primary master node, but it
     #   didn't answer yet.
-    # unfinished_tid_list
-    #   The list of unfinished TIDs known by master node.
     # replication_done
     #   False if we know there is something to replicate.
     #   True when current_partition is replicated, or we don't know yet if
@@ -140,13 +139,11 @@ class Replicator(object):
     current_partition = None
     current_connection = None
     waiting_for_unfinished_tids = False
-    unfinished_tid_list = None
     replication_done = True
 
     def __init__(self, app):
         self.app = app
         self.new_partition_set = set()
-        self.critical_tid_list = []
         self.partition_dict = {}
         self.task_list = []
         self.task_dict = {}
@@ -156,7 +153,6 @@ class Replicator(object):
         When connection to primary master is lost, stop waiting for unfinished
         transactions.
         """
-        self.critical_tid_list = []
         self.waiting_for_unfinished_tids = False
 
     def storageLost(self):
@@ -182,13 +178,11 @@ class Replicator(object):
         self.task_dict = {}
         self.current_partition = None
         self.current_connection = None
-        self.unfinished_tid_list = None
         self.replication_done = True
 
     def pending(self):
         """Return whether there is any pending partition."""
-        return len(self.partition_dict) or len(self.new_partition_set) \
-            or self.critical_tid_list
+        return len(self.partition_dict) or len(self.new_partition_set)
 
     def getCurrentOffset(self):
         assert self.current_partition is not None
@@ -205,25 +199,21 @@ class Replicator(object):
     def isCurrentConnection(self, conn):
         return self.current_connection is conn
 
-    def setCriticalTID(self, tid):
-        """This is a callback from MasterOperationHandler."""
-        neo.lib.logging.debug('setting critical TID %s to %s', dump(tid),
-            ', '.join([str(p) for p in self.critical_tid_list]))
-        for offset in self.critical_tid_list:
-            self.partition_dict[offset] = Partition(offset, tid)
-        self.critical_tid_list = []
-
-    def _askCriticalTID(self):
-        self.app.master_conn.ask(Packets.AskLastIDs())
-        self.critical_tid_list.extend(self.new_partition_set)
-        self.new_partition_set.clear()
-
-    def setUnfinishedTIDList(self, tid_list):
+    def setUnfinishedTIDList(self, max_tid, ttid_list):
         """This is a callback from MasterOperationHandler."""
-        neo.lib.logging.debug('setting unfinished TIDs %s',
-                      ','.join([dump(tid) for tid in tid_list]))
+        neo.lib.logging.debug('setting unfinished TTIDs %s',
+                      ','.join([dump(tid) for tid in ttid_list]))
+        # all new outdated partition must wait those ttid
+        new_partition_set = self.new_partition_set
+        while new_partition_set:
+            offset = new_partition_set.pop()
+            self.partition_dict[offset] = Partition(offset, max_tid, ttid_list)
         self.waiting_for_unfinished_tids = False
-        self.unfinished_tid_list = tid_list
+
+    def transactionFinished(self, ttid, max_tid):
+        """ Callback from MasterOperationHandler """
+        partition = self.partition_dict[self.app.pt.getPartition(ttid)]
+        partition.transactionFinished(ttid, max_tid)
 
     def _askUnfinishedTIDs(self):
         conn = self.app.master_conn
@@ -283,10 +273,6 @@ class Replicator(object):
             self.current_connection = None
 
     def act(self):
-        # If the new partition list is not empty, I must ask a critical
-        # TID to a primary master node.
-        if self.new_partition_set:
-            self._askCriticalTID()
 
         if self.current_partition is not None:
             # Don't end replication until we have received all expected
@@ -305,24 +291,22 @@ class Replicator(object):
             neo.lib.logging.debug('waiting for unfinished tids')
             return
 
-        if self.unfinished_tid_list is None:
+        if self.new_partition_set:
             # Ask pending transactions.
             neo.lib.logging.debug('asking unfinished tids')
             self._askUnfinishedTIDs()
             return
 
         # Try to select something.
-        if len(self.unfinished_tid_list):
-            min_unfinished_tid = min(self.unfinished_tid_list)
-        else:
-            min_unfinished_tid = None
         for partition in self.partition_dict.values():
-            if partition.safe(min_unfinished_tid):
+            # XXX: replication could start up to the initial critical tid, that
+            # is below the pending transactions, then finish when all pending
+            # transactions are committed.
+            if partition.safe():
                 self.current_partition = partition
                 break
         else:
             # Not yet.
-            self.unfinished_tid_list = None
             neo.lib.logging.debug('not ready yet')
             return
 

Modified: trunk/neo/tests/__init__.py
==============================================================================
--- trunk/neo/tests/__init__.py [iso-8859-1] (original)
+++ trunk/neo/tests/__init__.py [iso-8859-1] Tue Feb  8 15:07:07 2011
@@ -283,7 +283,7 @@ class NeoUnitTestBase(NeoTestBase):
     def checkNotifyPacket(self, conn, packet_type, packet_number=0, decode=False):
         """ Check if a notify-packet with the right type is sent """
         calls = conn.mockGetNamedCalls('notify')
-        self.assertTrue(len(calls) > packet_number)
+        self.assertTrue(len(calls) > packet_number, (len(calls), packet_number))
         packet = calls[packet_number].getParam(0)
         self.assertTrue(isinstance(packet, protocol.Packet))
         self.assertEquals(packet.getType(), packet_type)
@@ -324,6 +324,9 @@ class NeoUnitTestBase(NeoTestBase):
     def checkNotifyUnlockInformation(self, conn, **kw):
         return self.checkNotifyPacket(conn, Packets.NotifyUnlockInformation, **kw)
 
+    def checkNotifyTransactionFinished(self, conn, **kw):
+        return self.checkNotifyPacket(conn, Packets.NotifyTransactionFinished, **kw)
+
     def checkRequestIdentification(self, conn, **kw):
         return self.checkAskPacket(conn, Packets.RequestIdentification, **kw)
 

Modified: trunk/neo/tests/functional/testStorage.py
==============================================================================
--- trunk/neo/tests/functional/testStorage.py [iso-8859-1] (original)
+++ trunk/neo/tests/functional/testStorage.py [iso-8859-1] Tue Feb  8 15:07:07 2011
@@ -15,12 +15,14 @@
 # along with this program; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 
+import time
 import unittest
 import transaction
 from persistent import Persistent
 
 from neo.tests.functional import NEOCluster, NEOFunctionalTest
 from neo.lib.protocol import ClusterStates, NodeStates
+from ZODB.tests.StorageTestBase import zodb_pickle
 from MySQLdb import ProgrammingError
 from MySQLdb.constants.ER import NO_SUCH_TABLE
 
@@ -522,5 +524,46 @@ class StorageTests(NEOFunctionalTest):
         self.neo.expectClusterRecovering()
         self.neo.expectOudatedCells(number=10)
 
+    def testReplicationBlockedByUnfinished(self):
+        # start a cluster with 1 of 2 storages and a replica
+        (started, stopped) = self.__setup(storage_number=2, replicas=1,
+                pending_number=1, partitions=10)
+        self.neo.expectRunning(started[0])
+        self.neo.expectStorageNotKnown(stopped[0])
+        self.neo.expectOudatedCells(number=0)
+        self.neo.expectClusterRunning()
+        self.__populate()
+        self.neo.expectOudatedCells(number=0)
+
+        # start a transaction that will block the end of the replication
+        db, conn = self.neo.getZODBConnection()
+        st = conn._storage
+        t = transaction.Transaction()
+        t.user  = 'user'
+        t.description = 'desc'
+        oid = st.new_oid()
+        rev = '\0' * 8
+        data = zodb_pickle(PObject(42))
+        st.tpc_begin(t)
+        st.store(oid, rev, data, '', t)
+
+        # start the oudated storage
+        stopped[0].start()
+        self.neo.expectPending(stopped[0])
+        self.neo.neoctl.enableStorageList([stopped[0].getUUID()])
+        self.neo.expectRunning(stopped[0])
+        self.neo.expectClusterRunning()
+        self.neo.expectAssignedCells(started[0], 10)
+        self.neo.expectAssignedCells(stopped[0], 10)
+        # wait a bit, replication must not happen. This hack is required
+        # because we cannot gather informations directly from the storages
+        time.sleep(10)
+        self.neo.expectOudatedCells(number=10)
+
+        # finish the transaction, the replication must happen and finish
+        st.tpc_vote(t)
+        st.tpc_finish(t)
+        self.neo.expectOudatedCells(number=0, timeout=10)
+
 if __name__ == "__main__":
     unittest.main()

Modified: trunk/neo/tests/master/testClientHandler.py
==============================================================================
--- trunk/neo/tests/master/testClientHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testClientHandler.py [iso-8859-1] Tue Feb  8 15:07:07 2011
@@ -74,11 +74,12 @@ class MasterClientHandlerTests(NeoUnitTe
         })
         # client call it
         client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, port=self.client_port)
+        client_node = self.app.nm.getByUUID(client_uuid)
         conn = self.getFakeConnection(client_uuid, self.client_address)
         service.askBeginTransaction(conn, None)
         calls = tm.mockGetNamedCalls('begin')
         self.assertEqual(len(calls), 1)
-        calls[0].checkArgs(client_uuid, None)
+        calls[0].checkArgs(client_node, None)
         self.checkAnswerBeginTransaction(conn)
         # Client asks for a TID
         conn = self.getFakeConnection(client_uuid, self.client_address)
@@ -86,7 +87,7 @@ class MasterClientHandlerTests(NeoUnitTe
         service.askBeginTransaction(conn, tid1)
         calls = tm.mockGetNamedCalls('begin')
         self.assertEqual(len(calls), 1)
-        calls[0].checkArgs(client_uuid, None)
+        calls[0].checkArgs(client_node, None)
         args = self.checkAnswerBeginTransaction(conn, decode=True)
         self.assertEqual(args, (tid1, ))
 
@@ -142,9 +143,10 @@ class MasterClientHandlerTests(NeoUnitTe
         self.assertTrue(self.app.isStorageReady(storage_uuid))
         service.askFinishTransaction(conn, ttid, oid_list)
         self.checkAskLockInformation(storage_conn)
-        self.assertEquals(len(self.app.tm.getPendingList()), 1)
+        self.assertEquals(len(self.app.tm.registerForNotification(storage_uuid)), 1)
         txn = self.app.tm[ttid]
-        self.assertEquals(txn.getTID(), self.app.tm.getPendingList()[0])
+        pending_ttid = list(self.app.tm.registerForNotification(storage_uuid))[0]
+        self.assertEquals(ttid, pending_ttid)
         self.assertEquals(len(txn.getOIDList()), 0)
         self.assertEquals(len(txn.getUUIDList()), 1)
 

Modified: trunk/neo/tests/master/testStorageHandler.py
==============================================================================
--- trunk/neo/tests/master/testStorageHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testStorageHandler.py [iso-8859-1] Tue Feb  8 15:07:07 2011
@@ -101,8 +101,8 @@ class MasterStorageHandlerTests(NeoUnitT
         oid_list = self.getOID(), self.getOID()
         msg_id = 1
         # register a transaction
-        ttid = self.app.tm.begin(client_1.getUUID())
-        tid = self.app.tm.prepare(client_1, ttid, 1, oid_list, uuid_list,
+        ttid = self.app.tm.begin(client_1)
+        tid = self.app.tm.prepare(ttid, 1, oid_list, uuid_list,
             msg_id)
         self.assertTrue(ttid in self.app.tm)
         # the first storage acknowledge the lock
@@ -141,17 +141,17 @@ class MasterStorageHandlerTests(NeoUnitT
         # give a uuid
         service.askUnfinishedTransactions(conn)
         packet = self.checkAnswerUnfinishedTransactions(conn)
-        tid_list, = packet.decode()
+        max_tid, tid_list = packet.decode()
         self.assertEqual(tid_list, [])
         # create some transaction
         node, conn = self.identifyToMasterNode(node_type=NodeTypes.CLIENT,
                                                 port=self.client_port)
-        self.app.tm.prepare(node, self.getNextTID(), 1,
+        ttid = self.app.tm.begin(node)
+        self.app.tm.prepare(ttid, 1,
             [self.getOID(1)], [node.getUUID()], 1)
         conn = self.getFakeConnection(node.getUUID(), self.storage_address)
         service.askUnfinishedTransactions(conn)
-        packet = self.checkAnswerUnfinishedTransactions(conn)
-        (tid_list, ) = packet.decode()
+        max_tid, tid_list = self.checkAnswerUnfinishedTransactions(conn, decode=True)
         self.assertEqual(len(tid_list), 1)
 
     def _testWithMethod(self, method, state):
@@ -208,26 +208,28 @@ class MasterStorageHandlerTests(NeoUnitT
         # Transaction 1: 2 storage nodes involved, one will die and the other
         # already answered node lock
         msg_id_1 = 1
-        ttid1 = tm.begin(node1.getUUID())
-        tid1 = tm.prepare(client1, ttid1, 1, oid_list,
+        ttid1 = tm.begin(client1)
+        tid1 = tm.prepare(ttid1, 1, oid_list,
             [node1.getUUID(), node2.getUUID()], msg_id_1)
         tm.lock(ttid1, node2.getUUID())
+        # storage 1 request a notification at commit
+        tm. registerForNotification(node1.getUUID())
         self.checkNoPacketSent(cconn1)
         # Storage 1 dies
         node1.setTemporarilyDown()
         self.service.nodeLost(conn1, node1)
         # T1: last locking node lost, client receives AnswerTransactionFinished
         self.checkAnswerTransactionFinished(cconn1)
+        self.checkNotifyTransactionFinished(conn1)
         self.checkNotifyUnlockInformation(conn2)
-        self.checkNoPacketSent(conn1)
         # ...and notifications are sent to other clients
         self.checkInvalidateObjects(cconn2)
         self.checkInvalidateObjects(cconn3)
 
         # Transaction 2: 2 storage nodes involved, one will die
         msg_id_2 = 2
-        ttid2 = tm.begin(node1.getUUID())
-        tid2 = tm.prepare(client2, ttid2, 1, oid_list,
+        ttid2 = tm.begin(node1)
+        tid2 = tm.prepare(ttid2, 1, oid_list,
             [node1.getUUID(), node2.getUUID()], msg_id_2)
         # T2: pending locking answer, client keeps waiting
         self.checkNoPacketSent(cconn2, check_notify=False)
@@ -235,8 +237,8 @@ class MasterStorageHandlerTests(NeoUnitT
 
         # Transaction 3: 1 storage node involved, which won't die
         msg_id_3 = 3
-        ttid3 = tm.begin(node1.getUUID())
-        tid3 = tm.prepare(client3, ttid3, 1, oid_list,
+        ttid3 = tm.begin(node1)
+        tid3 = tm.prepare(ttid3, 1, oid_list,
             [node2.getUUID(), ], msg_id_3)
         # T3: action not significant to this transacion, so no response
         self.checkNoPacketSent(cconn3, check_notify=False)

Modified: trunk/neo/tests/master/testTransactions.py
==============================================================================
--- trunk/neo/tests/master/testTransactions.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testTransactions.py [iso-8859-1] Tue Feb  8 15:07:07 2011
@@ -37,7 +37,7 @@ class testTransactionManager(NeoUnitTest
 
     def makeNode(self, i):
         uuid = self.makeUUID(i)
-        node = Mock({'getUUID': uuid, '__hash__': 0})
+        node = Mock({'getUUID': uuid, '__hash__': i, '__repr__': 'FakeNode'})
         return uuid, node
 
     def testTransaction(self):
@@ -49,7 +49,8 @@ class testTransactionManager(NeoUnitTest
         uuid_list = (uuid1, uuid2) = [self.makeUUID(1), self.makeUUID(2)]
         msg_id = 1
         # create transaction object
-        txn = Transaction(node, ttid, tid, oid_list, uuid_list, msg_id)
+        txn = Transaction(node, ttid)
+        txn.prepare(tid, oid_list, uuid_list, msg_id)
         self.assertEqual(txn.getUUIDList(), uuid_list)
         self.assertEqual(txn.getOIDList(), oid_list)
         # lock nodes one by one
@@ -69,16 +70,16 @@ class testTransactionManager(NeoUnitTest
         callback = Mock()
         txnman = TransactionManager(on_commit=callback)
         self.assertFalse(txnman.hasPending())
-        self.assertEqual(txnman.getPendingList(), [])
+        self.assertEqual(txnman.registerForNotification(uuid1), set())
         # begin the transaction
-        ttid = txnman.begin(client_uuid)
+        ttid = txnman.begin(node)
         self.assertTrue(ttid is not None)
-        self.assertFalse(txnman.hasPending())
-        self.assertEqual(len(txnman.getPendingList()), 0)
+        self.assertEqual(len(txnman.registerForNotification(uuid1)), 1)
+        self.assertTrue(txnman.hasPending())
         # prepare the transaction
-        tid = txnman.prepare(node, ttid, 1, oid_list, uuid_list, msg_id)
+        tid = txnman.prepare(ttid, 1, oid_list, uuid_list, msg_id)
         self.assertTrue(txnman.hasPending())
-        self.assertEqual(txnman.getPendingList()[0], tid)
+        self.assertEqual(txnman.registerForNotification(uuid1), set([ttid]))
         txn = txnman[ttid]
         self.assertEqual(txn.getTID(), tid)
         self.assertEqual(txn.getUUIDList(), list(uuid_list))
@@ -90,30 +91,30 @@ class testTransactionManager(NeoUnitTest
         self.assertEqual(len(callback.getNamedCalls('__call__')), 1)
         # transaction finished
         txnman.remove(client_uuid, ttid)
-        self.assertEqual(txnman.getPendingList(), [])
+        self.assertEqual(txnman.registerForNotification(uuid1), set())
 
     def testAbortFor(self):
-        node1 = Mock({'__hash__': 1})
-        node2 = Mock({'__hash__': 2})
         oid_list = [self.makeOID(1), ]
-        storage_1_uuid = self.makeUUID(1)
-        storage_2_uuid = self.makeUUID(2)
-        client_uuid = self.makeUUID(3)
+        storage_1_uuid, node1 = self.makeNode(1)
+        storage_2_uuid, node2 = self.makeNode(2)
+        client_uuid, client = self.makeNode(3)
         txnman = TransactionManager(lambda tid, txn: None)
         # register 4 transactions made by two nodes
-        self.assertEqual(txnman.getPendingList(), [])
-        ttid1 = txnman.begin(client_uuid)
-        tid1 = txnman.prepare(node1, ttid1, 1, oid_list, [storage_1_uuid], 1)
-        self.assertEqual(txnman.getPendingList(), [tid1])
+        self.assertEqual(txnman.registerForNotification(storage_1_uuid), set())
+        ttid1 = txnman.begin(client)
+        tid1 = txnman.prepare(ttid1, 1, oid_list, [storage_1_uuid], 1)
+        self.assertEqual(txnman.registerForNotification(storage_1_uuid), set([ttid1]))
         # abort transactions of another node, transaction stays
         txnman.abortFor(node2)
-        self.assertEqual(txnman.getPendingList(), [tid1])
-        # abort transactions of requesting node, transaction is removed
+        self.assertEqual(txnman.registerForNotification(storage_1_uuid), set([ttid1]))
+        # abort transactions of requesting node, transaction is not removed
+        # because the transaction is prepared and must remains until the end of
+        # the 2PC
         txnman.abortFor(node1)
-        self.assertEqual(txnman.getPendingList(), [])
-        self.assertFalse(txnman.hasPending())
+        self.assertEqual(txnman.registerForNotification(storage_1_uuid), set([ttid1]))
+        self.assertTrue(txnman.hasPending())
         # ...and the lock is available
-        txnman.begin(client_uuid, self.getNextTID())
+        txnman.begin(client, self.getNextTID())
 
     def test_getNextOIDList(self):
         txnman = TransactionManager(lambda tid, txn: None)
@@ -141,8 +142,8 @@ class testTransactionManager(NeoUnitTest
         # Transaction 1: 2 storage nodes involved, one will die and the other
         # already answered node lock
         msg_id_1 = 1
-        ttid1 = tm.begin(client_uuid)
-        tid1 = tm.prepare(client1, ttid1, 1, oid_list,
+        ttid1 = tm.begin(client1)
+        tid1 = tm.prepare(ttid1, 1, oid_list,
             [storage_1_uuid, storage_2_uuid], msg_id_1)
         tm.lock(ttid1, storage_2_uuid)
         t1 = tm[ttid1]
@@ -155,8 +156,8 @@ class testTransactionManager(NeoUnitTest
 
         # Transaction 2: 2 storage nodes involved, one will die
         msg_id_2 = 2
-        ttid2 = tm.begin(client_uuid)
-        tid2 = tm.prepare(client2, ttid2, 1, oid_list,
+        ttid2 = tm.begin(client2)
+        tid2 = tm.prepare(ttid2, 1, oid_list,
             [storage_1_uuid, storage_2_uuid], msg_id_2)
         t2 = tm[ttid2]
         self.assertFalse(t2.locked())
@@ -169,8 +170,8 @@ class testTransactionManager(NeoUnitTest
 
         # Transaction 3: 1 storage node involved, which won't die
         msg_id_3 = 3
-        ttid3 = tm.begin(client_uuid)
-        tid3 = tm.prepare(client3, ttid3, 1, oid_list, [storage_2_uuid, ],
+        ttid3 = tm.begin(client3)
+        tid3 = tm.prepare(ttid3, 1, oid_list, [storage_2_uuid, ],
             msg_id_3)
         t3 = tm[ttid3]
         self.assertFalse(t3.locked())
@@ -213,29 +214,28 @@ class testTransactionManager(NeoUnitTest
         strictly increasing order.
         Note: this implementation might change later, to allow more paralelism.
         """
-        client_uuid = self.makeUUID(3)
+        client_uuid, client = self.makeNode(1)
         tm = TransactionManager(lambda tid, txn: None)
         # With a requested TID, lock spans from begin to remove
         ttid1 = self.getNextTID()
         ttid2 = self.getNextTID()
-        tid1 = tm.begin(client_uuid, ttid1)
+        tid1 = tm.begin(client, ttid1)
         self.assertEqual(tid1, ttid1)
         tm.remove(client_uuid, tid1)
         # Without a requested TID, lock spans from prepare to remove only
-        ttid3 = tm.begin(client_uuid)
-        ttid4 = tm.begin(client_uuid) # Doesn't raise
+        ttid3 = tm.begin(client)
+        ttid4 = tm.begin(client) # Doesn't raise
         node = Mock({'getUUID': client_uuid, '__hash__': 0})
-        tid4 = tm.prepare(node, ttid4, 1, [], [], 0)
+        tid4 = tm.prepare(ttid4, 1, [], [], 0)
         tm.remove(client_uuid, tid4)
-        tm.prepare(node, ttid3, 1, [], [], 0)
+        tm.prepare(ttid3, 1, [], [], 0)
 
     def testClientDisconectsAfterBegin(self):
-        client1_uuid = self.makeUUID(1)
+        client_uuid1, node1 = self.makeNode(1)
         tm = TransactionManager(lambda tid, txn: None)
         tid1 = self.getNextTID()
         tid2 = self.getNextTID()
-        tm.begin(client1_uuid, tid1)
-        node1 = Mock({'getUUID': client1_uuid, '__hash__': 0})
+        tm.begin(node1, tid1)
         tm.abortFor(node1)
         self.assertTrue(tid1 not in tm)
 
@@ -245,10 +245,10 @@ class testTransactionManager(NeoUnitTest
         uuid2, node2 = self.makeNode(2)
         storage_uuid = self.makeUUID(3)
         tm = TransactionManager(callback)
-        ttid1 = tm.begin(uuid1)
-        ttid2 = tm.begin(uuid2)
-        tid1 = tm.prepare(node1, ttid1, 1, [], [storage_uuid], 0)
-        tid2 = tm.prepare(node2, ttid2, 1, [], [storage_uuid], 0)
+        ttid1 = tm.begin(node1)
+        ttid2 = tm.begin(node2)
+        tid1 = tm.prepare(ttid1, 1, [], [storage_uuid], 0)
+        tid2 = tm.prepare(ttid2, 1, [], [storage_uuid], 0)
         tm.lock(ttid2, storage_uuid)
         # txn 2 is still blocked by txn 1
         self.assertEqual(len(callback.getNamedCalls('__call__')), 0)

Modified: trunk/neo/tests/master/testVerification.py
==============================================================================
--- trunk/neo/tests/master/testVerification.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testVerification.py [iso-8859-1] Tue Feb  8 15:07:07 2011
@@ -124,14 +124,14 @@ class MasterVerificationTests(NeoUnitTes
         self.assertEquals(len(self.verification._uuid_set), 0)
         self.assertEquals(len(self.verification._tid_set), 0)
         new_tid = self.getNextTID()
-        verification.answerUnfinishedTransactions(conn, [new_tid])
+        verification.answerUnfinishedTransactions(conn, new_tid, [new_tid])
         self.assertEquals(len(self.verification._tid_set), 0)
         # update dict
         conn = self.getFakeConnection(uuid, self.storage_address)
         self.verification._uuid_set.add(uuid)
         self.assertEquals(len(self.verification._tid_set), 0)
         new_tid = self.getNextTID(new_tid)
-        verification.answerUnfinishedTransactions(conn, [new_tid,])
+        verification.answerUnfinishedTransactions(conn, new_tid, [new_tid])
         self.assertTrue(uuid not in self.verification._uuid_set)
         self.assertEquals(len(self.verification._tid_set), 1)
         self.assertTrue(new_tid in self.verification._tid_set)

Modified: trunk/neo/tests/storage/testMasterHandler.py
==============================================================================
--- trunk/neo/tests/storage/testMasterHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testMasterHandler.py [iso-8859-1] Tue Feb  8 15:07:07 2011
@@ -190,11 +190,12 @@ class StorageMasterHandlerTests(NeoUnitT
         self.app.replicator = Mock()
         self.operation.answerUnfinishedTransactions(
             conn=conn,
-            tid_list=(INVALID_TID, ),
+            max_tid=INVALID_TID,
+            ttid_list=(INVALID_TID, ),
         )
         calls = self.app.replicator.mockGetNamedCalls('setUnfinishedTIDList')
         self.assertEquals(len(calls), 1)
-        calls[0].checkArgs((INVALID_TID, ))
+        calls[0].checkArgs(INVALID_TID, (INVALID_TID, ))
 
     def test_askPack(self):
         self.app.dm = Mock({'pack': None})

Modified: trunk/neo/tests/storage/testReplicator.py
==============================================================================
--- trunk/neo/tests/storage/testReplicator.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testReplicator.py [iso-8859-1] Tue Feb  8 15:07:07 2011
@@ -40,7 +40,6 @@ class StorageReplicatorTests(NeoUnitTest
         })
         replicator = Replicator(app)
         self.assertEqual(replicator.new_partition_set, set())
-        replicator.replication_done = False
         replicator.populate()
         self.assertEqual(replicator.new_partition_set, set([0]))
 
@@ -50,40 +49,32 @@ class StorageReplicatorTests(NeoUnitTest
         replicator.task_dict = {'foo': 'bar'}
         replicator.current_partition = 'foo'
         replicator.current_connection = 'foo'
-        replicator.unfinished_tid_list = ['foo']
         replicator.replication_done = 'foo'
         replicator.reset()
         self.assertEqual(replicator.task_list, [])
         self.assertEqual(replicator.task_dict, {})
         self.assertEqual(replicator.current_partition, None)
         self.assertEqual(replicator.current_connection, None)
-        self.assertEqual(replicator.unfinished_tid_list, None)
         self.assertTrue(replicator.replication_done)
 
     def test_setCriticalTID(self):
-        replicator = Replicator(None)
         critical_tid = self.getNextTID()
-        partition = Partition(0, critical_tid)
+        partition = Partition(0, critical_tid, [])
         self.assertEqual(partition.getCriticalTID(), critical_tid)
-
-    def test_setUnfinishedTIDList(self):
-        replicator = Replicator(None)
-        replicator.waiting_for_unfinished_tids = True
-        assert replicator.unfinished_tid_list is None, \
-            replicator.unfinished_tid_list
-        tid_list = [self.getNextTID(), ]
-        replicator.setUnfinishedTIDList(tid_list)
-        self.assertEqual(replicator.unfinished_tid_list, tid_list)
-        self.assertFalse(replicator.waiting_for_unfinished_tids)
+        self.assertEqual(partition.getOffset(), 0)
 
     def test_act(self):
         # Also tests "pending"
         uuid = self.getNewUUID()
         master_uuid = self.getNewUUID()
-        bad_unfinished_tid = self.getNextTID()
-        critical_tid = self.getNextTID()
-        unfinished_tid = self.getNextTID()
+        critical_tid_0 = self.getNextTID()
+        critical_tid_1 = self.getNextTID()
+        critical_tid_2 = self.getNextTID()
+        unfinished_ttid_1 = self.getOID(1)
+        unfinished_ttid_2 = self.getOID(2)
         app = Mock()
+        app.server = ('127.0.0.1', 10000)
+        app.name = 'fake cluster'
         app.em = Mock({
             'register': None,
         })
@@ -105,6 +96,7 @@ class StorageReplicatorTests(NeoUnitTest
         app.pt = Mock({
             'getCellList': [running_cell, unknown_cell],
             'getOutdatedOffsetListFor': [0],
+            'getPartition': 0,
         })
         node_conn_handler = Mock({
             'startReplication': None,
@@ -119,37 +111,28 @@ class StorageReplicatorTests(NeoUnitTest
             app.master_conn = self.getFakeConnection(uuid=master_uuid)
             self.assertTrue(replicator.pending())
             replicator.act()
-        # ask last IDs to infer critical_tid and unfinished tids
+        # ask unfinished tids
         act()
-        last_ids, unfinished_tids = [x.getParam(0) for x in \
-            app.master_conn.mockGetNamedCalls('ask')]
-        self.assertEqual(last_ids.getType(), Packets.AskLastIDs)
-        self.assertFalse(replicator.new_partition_set)
-        self.assertEqual(unfinished_tids.getType(),
-            Packets.AskUnfinishedTransactions)
+        unfinished_tids = app.master_conn.mockGetNamedCalls('ask')[0].getParam(0)
+        self.assertTrue(replicator.new_partition_set)
+        self.assertEqual(unfinished_tids.getType(), Packets.AskUnfinishedTransactions)
         self.assertTrue(replicator.waiting_for_unfinished_tids)
         # nothing happens until waiting_for_unfinished_tids becomes False
         act()
         self.checkNoPacketSent(app.master_conn)
         self.assertTrue(replicator.waiting_for_unfinished_tids)
-        # Send answers (garanteed to happen in this order)
-        replicator.setCriticalTID(critical_tid)
-        act()
-        self.checkNoPacketSent(app.master_conn)
-        self.assertTrue(replicator.waiting_for_unfinished_tids)
         # first time, there is an unfinished tid before critical tid,
         # replication cannot start, and unfinished TIDs are asked again
-        replicator.setUnfinishedTIDList([unfinished_tid, bad_unfinished_tid])
+        replicator.setUnfinishedTIDList(critical_tid_0,
+            [unfinished_ttid_1, unfinished_ttid_2])
         self.assertFalse(replicator.waiting_for_unfinished_tids)
         # Note: detection that nothing can be replicated happens on first call
         # and unfinished tids are asked again on second call. This is ok, but
         # might change, so just call twice.
         act()
+        replicator.transactionFinished(unfinished_ttid_1, critical_tid_1)
         act()
-        self.checkAskPacket(app.master_conn, Packets.AskUnfinishedTransactions)
-        self.assertTrue(replicator.waiting_for_unfinished_tids)
-        # this time, critical tid check should be satisfied
-        replicator.setUnfinishedTIDList([unfinished_tid, ])
+        replicator.transactionFinished(unfinished_ttid_2, critical_tid_2)
         replicator.current_connection = node_conn
         act()
         self.assertEqual(replicator.current_partition,
@@ -174,8 +157,6 @@ class StorageReplicatorTests(NeoUnitTest
             'isPending': False,
         })
         act()
-        # unfinished tid list will not be asked again
-        self.assertTrue(replicator.unfinished_tid_list)
         # also, replication is over
         self.assertFalse(replicator.pending())
 

Modified: trunk/neo/tests/storage/testVerificationHandler.py
==============================================================================
--- trunk/neo/tests/storage/testVerificationHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testVerificationHandler.py [iso-8859-1] Tue Feb  8 15:07:07 2011
@@ -161,7 +161,7 @@ class StorageVerificationHandlerTests(Ne
         })
         conn = self.getMasterConnection()
         self.verification.askUnfinishedTransactions(conn)
-        (tid_list, ) = self.checkAnswerUnfinishedTransactions(conn, decode=True)
+        (max_tid, tid_list) = self.checkAnswerUnfinishedTransactions(conn, decode=True)
         self.assertEqual(len(tid_list), 0)
         call_list = self.app.dm.mockGetNamedCalls('getUnfinishedTIDList')
         self.assertEqual(len(call_list), 1)
@@ -173,7 +173,7 @@ class StorageVerificationHandlerTests(Ne
         })
         conn = self.getMasterConnection()
         self.verification.askUnfinishedTransactions(conn)
-        (tid_list, ) = self.checkAnswerUnfinishedTransactions(conn, decode=True)
+        (max_tid, tid_list) = self.checkAnswerUnfinishedTransactions(conn, decode=True)
         self.assertEqual(len(tid_list), 1)
         self.assertEqual(u64(tid_list[0]), 4)
 

Modified: trunk/neo/tests/testProtocol.py
==============================================================================
--- trunk/neo/tests/testProtocol.py [iso-8859-1] (original)
+++ trunk/neo/tests/testProtocol.py [iso-8859-1] Tue Feb  8 15:07:07 2011
@@ -195,13 +195,15 @@ class ProtocolTests(NeoUnitTestBase):
         self.assertEqual(p.decode(), ())
 
     def test_27_answerUnfinishedTransaction(self):
+        tid = self.getNextTID()
         tid1 = self.getNextTID()
         tid2 = self.getNextTID()
         tid3 = self.getNextTID()
         tid4 = self.getNextTID()
         tid_list = [tid1, tid2, tid3, tid4]
-        p = Packets.AnswerUnfinishedTransactions(tid_list)
-        p_tid_list  = p.decode()[0]
+        p = Packets.AnswerUnfinishedTransactions(tid, tid_list)
+        p_tid, p_tid_list  = p.decode()
+        self.assertEqual(p_tid, tid)
         self.assertEqual(p_tid_list, tid_list)
 
     def test_28_askObjectPresent(self):




More information about the Neo-report mailing list