[Neo-report] r2550 gregory - in /trunk/neo: ./ client/ client/handlers/ storage/handlers/ ...

nobody at svn.erp5.org nobody at svn.erp5.org
Wed Dec 15 18:23:37 CET 2010


Author: gregory
Date: Wed Dec 15 18:23:37 2010
New Revision: 2550

Log:
undoLog is broken, make the iterator use a workaround.

undoLog doesn't work when first is non-zero, this breaks iterator and
cannot be fixed for undoLog at the moment.

Modified:
    trunk/neo/client/app.py
    trunk/neo/client/handlers/storage.py
    trunk/neo/client/iterator.py
    trunk/neo/protocol.py
    trunk/neo/storage/handlers/client.py
    trunk/neo/storage/handlers/replication.py
    trunk/neo/storage/handlers/storage.py
    trunk/neo/tests/storage/testReplicationHandler.py
    trunk/neo/tests/storage/testStorageHandler.py
    trunk/neo/tests/testProtocol.py

Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Wed Dec 15 18:23:37 2010
@@ -1127,7 +1127,32 @@ class Application(object):
         for k, v in loads(extension).items():
             txn_info[k] = v
 
-    def __undoLog(self, first, last, filter=None, block=0, with_oids=False):
+    def _getTransactionInformation(self, tid):
+        cell_list = self._getCellListForTID(tid, readable=True)
+        shuffle(cell_list)
+        cell_list.sort(key=self.cp.getCellSortKey)
+        for cell in cell_list:
+            conn = self.cp.getConnForCell(cell)
+            if conn is not None:
+                self.local_var.txn_info = 0
+                self.local_var.txn_ext = 0
+                try:
+                    self._askStorage(conn,
+                            Packets.AskTransactionInformation(tid))
+                except ConnectionClosed:
+                    continue
+                if isinstance(self.local_var.txn_info, dict):
+                    break
+        if self.local_var.txn_info in (-1, 0):
+            # TID not found at all
+            raise NeoException, 'Data inconsistency detected: ' \
+                                'transaction info for TID %r could not ' \
+                                'be found' % (tid, )
+        return (self.local_var.txn_info, self.local_var.txn_ext)
+
+
+    def undoLog(self, first, last, filter=None, block=0):
+        # XXX: undoLog is broken
         if last < 0:
             # See FileStorage.py for explanation
             last = first - last
@@ -1161,51 +1186,51 @@ class Application(object):
         undo_info = []
         append = undo_info.append
         for tid in ordered_tids:
-            cell_list = self._getCellListForTID(tid, readable=True)
-            shuffle(cell_list)
-            cell_list.sort(key=self.cp.getCellSortKey)
-            for cell in cell_list:
-                conn = self.cp.getConnForCell(cell)
-                if conn is not None:
-                    self.local_var.txn_info = 0
-                    self.local_var.txn_ext = 0
-                    try:
-                        self._askStorage(conn,
-                                Packets.AskTransactionInformation(tid))
-                    except ConnectionClosed:
-                        continue
-                    if isinstance(self.local_var.txn_info, dict):
-                        break
-
-            if self.local_var.txn_info in (-1, 0):
-                # TID not found at all
-                raise NeoException, 'Data inconsistency detected: ' \
-                                    'transaction info for TID %r could not ' \
-                                    'be found' % (tid, )
-
+            (txn_info, txn_ext) = self._getTransactionInformation(tid)
             if filter is None or filter(self.local_var.txn_info):
                 txn_info = self.local_var.txn_info
                 txn_info.pop('packed')
-                if not with_oids:
-                    txn_info.pop("oids")
-                    self._insertMetadata(txn_info, self.local_var.txn_ext)
-                else:
-                    txn_info['ext'] = loads(self.local_var.txn_ext)
+                txn_info.pop("oids")
+                self._insertMetadata(txn_info, self.local_var.txn_ext)
                 append(txn_info)
                 if len(undo_info) >= last - first:
                     break
         # Check we return at least one element, otherwise call
         # again but extend offset
         if len(undo_info) == 0 and not block:
-            undo_info = self.__undoLog(first=first, last=last*5, filter=filter,
-                    block=1, with_oids=with_oids)
+            undo_info = self.undoLog(first=first, last=last*5, filter=filter,
+                    block=1)
         return undo_info
 
-    def undoLog(self, first, last, filter=None, block=0):
-        return self.__undoLog(first, last, filter, block)
-
-    def transactionLog(self, first, last):
-        return self.__undoLog(first, last, with_oids=True)
+    def transactionLog(self, start, stop, limit):
+        node_map = self.pt.getNodeMap()
+        node_list = node_map.keys()
+        node_list.sort(key=self.cp.getCellSortKey)
+        partition_set = set(range(self.pt.getPartitions()))
+        queue = self.local_var.queue
+        # request a tid list for each partition
+        self.local_var.tids_from = set()
+        for node in node_list:
+            conn = self.cp.getConnForNode(node)
+            request_set = set(node_map[node]) & partition_set
+            if conn is None or not request_set:
+                continue
+            partition_set -= set(request_set)
+            packet = Packets.AskTIDsFrom(start, stop, limit, request_set)
+            conn.ask(packet, queue=queue)
+            if not partition_set:
+                break
+        assert not partition_set
+        self.waitResponses()
+        # request transactions informations
+        txn_list = []
+        append = txn_list.append
+        tid = None
+        for tid in sorted(self.local_var.tids_from):
+            (txn_info, txn_ext) = self._getTransactionInformation(tid)
+            txn_info['ext'] = loads(self.local_var.txn_ext)
+            append(txn_info)
+        return (tid, txn_list)
 
     def history(self, oid, version=None, size=1, filter=None):
         # Get history informations for object first
@@ -1297,7 +1322,9 @@ class Application(object):
             assert real_tid == tid, (real_tid, tid)
         transaction_iter.close()
 
-    def iterator(self, start=None, stop=None):
+    def iterator(self, start, stop):
+        if start is None:
+            start = ZERO_TID
         return Iterator(self, start, stop)
 
     def lastTransaction(self):

Modified: trunk/neo/client/handlers/storage.py
==============================================================================
--- trunk/neo/client/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/client/handlers/storage.py [iso-8859-1] Wed Dec 15 18:23:37 2010
@@ -95,6 +95,11 @@ class StorageAnswersHandler(AnswerBaseHa
         if tid != self.app.getTID():
             raise NEOStorageError('Wrong TID, transaction not started')
 
+    def answerTIDsFrom(self, conn, tid_list):
+        neo.logging.debug('Get %d TIDs from %r', len(tid_list), conn)
+        assert not self.app.local_var.tids_from.intersection(set(tid_list))
+        self.app.local_var.tids_from.update(tid_list)
+
     def answerTransactionInformation(self, conn, tid,
                                            user, desc, ext, packed, oid_list):
         # transaction information are returned as a dict

Modified: trunk/neo/client/iterator.py
==============================================================================
--- trunk/neo/client/iterator.py [iso-8859-1] (original)
+++ trunk/neo/client/iterator.py [iso-8859-1] Wed Dec 15 18:23:37 2010
@@ -18,10 +18,12 @@
 from ZODB import BaseStorage
 from zope.interface import implements
 import ZODB.interfaces
-from neo import util
+from neo.util import u64, add64
 from neo.client.exception import NEOStorageCreationUndoneError
 from neo.client.exception import NEOStorageNotFoundError
 
+CHUNK_LENGTH = 100
+
 class Record(BaseStorage.DataRecord):
     """ TBaseStorageransaction record yielded by the Transaction object """
 
@@ -29,8 +31,8 @@ class Record(BaseStorage.DataRecord):
         BaseStorage.DataRecord.__init__(self, oid, tid, data, prev)
 
     def __str__(self):
-        oid = util.u64(self.oid)
-        tid = util.u64(self.tid)
+        oid = u64(self.oid)
+        tid = u64(self.tid)
         args = (oid, tid, len(self.data), self.data_txn)
         return 'Record %s:%s: %s (%s)' % args
 
@@ -86,7 +88,7 @@ class Transaction(BaseStorage.Transactio
         return record
 
     def __str__(self):
-        tid = util.u64(self.tid)
+        tid = u64(self.tid)
         args = (tid, self.user, self.status)
         return 'Transaction #%s: %s %s' % args
 
@@ -97,17 +99,15 @@ class Iterator(object):
     def __init__(self, app, start, stop):
         self.app = app
         self.txn_list = []
+        assert None not in (start, stop)
+        self._start = start
         self._stop = stop
-        # next index to load from storage nodes
-        self._next = 0
         # index of current iteration
         self._index = 0
         self._closed = False
         # OID -> previous TID mapping
         # TODO: prune old entries while walking ?
         self._prev_serial_dict = {}
-        if start is not None:
-            self.txn_list = self._skip(start)
 
     def __iter__(self):
         return self
@@ -118,41 +118,21 @@ class Iterator(object):
             raise IndexError, index
         return self.next()
 
-    def _read(self):
-        """ Request more transactions """
-        chunk = self.app.transactionLog(self._next, self._next + 100)
-        if not chunk:
-            # nothing more
-            raise StopIteration
-        self._next += len(chunk)
-        return chunk
-
-    def _skip(self, start):
-        """ Skip transactions until 'start' is reached """
-        chunk = self._read()
-        while chunk[0]['id'] < start:
-            chunk = self._read()
-        if chunk[-1]['id'] < start:
-            for index, txn in enumerate(reversed(chunk)):
-                if txn['id'] >= start:
-                    break
-            # keep only greater transactions
-            chunk = chunk[:-index]
-        return chunk
-
     def next(self):
         """ Return an iterator for the next transaction"""
         if self._closed:
             raise IOError, 'iterator closed'
         if not self.txn_list:
-            self.txn_list = self._read()
-        txn = self.txn_list.pop()
+            (max_tid, chunk) = self.app.transactionLog(self._start, self._stop,
+                CHUNK_LENGTH)
+            if not chunk:
+                # nothing more
+                raise StopIteration
+            self._start = add64(max_tid, 1)
+            self.txn_list = chunk
+        txn = self.txn_list.pop(0)
         self._index += 1
         tid = txn['id']
-        stop = self._stop
-        if stop is not None and stop < tid:
-            # stop reached
-            raise StopIteration
         user = txn['user_name']
         desc = txn['description']
         oid_list = txn['oids']

Modified: trunk/neo/protocol.py
==============================================================================
--- trunk/neo/protocol.py [iso-8859-1] (original)
+++ trunk/neo/protocol.py [iso-8859-1] Wed Dec 15 18:23:37 2010
@@ -1098,12 +1098,29 @@ class AskTIDsFrom(Packet):
     S -> S.
     """
     _header_format = '!8s8sLL'
+    _list_entry_format = 'L'
+    _list_entry_len = calcsize(_list_entry_format)
 
-    def _encode(self, min_tid, max_tid, length, partition):
-        return pack(self._header_format, min_tid, max_tid, length, partition)
+    def _encode(self, min_tid, max_tid, length, partition_list):
+        body = [pack(self._header_format, min_tid, max_tid, length,
+            len(partition_list))]
+        list_entry_format = self._list_entry_format
+        for partition in partition_list:
+            body.append(pack(list_entry_format, partition))
+        return ''.join(body)
 
     def _decode(self, body):
-        return unpack(self._header_format, body) # min_tid, length, partition
+        body = StringIO(body)
+        read = body.read
+        header = unpack(self._header_format, read(self._header_len))
+        min_tid, max_tid, length, list_length = header
+        list_entry_format = self._list_entry_format
+        list_entry_len = self._list_entry_len
+        partition_list = []
+        for _ in xrange(list_length):
+            partition = unpack(list_entry_format, read(list_entry_len))[0]
+            partition_list.append(partition)
+        return (min_tid, max_tid, length, partition_list)
 
 class AnswerTIDsFrom(AnswerTIDs):
     """

Modified: trunk/neo/storage/handlers/client.py
==============================================================================
--- trunk/neo/storage/handlers/client.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/client.py [iso-8859-1] Wed Dec 15 18:23:37 2010
@@ -86,6 +86,17 @@ class ClientOperationHandler(BaseClientA
         self._askStoreObject(conn, oid, serial, compression, checksum, data,
             data_serial, tid, time.time())
 
+    def askTIDsFrom(self, conn, min_tid, max_tid, length, partition_list):
+        app = self.app
+        getReplicationTIDList = app.dm.getReplicationTIDList
+        partitions = app.pt.getPartitions()
+        tid_list = []
+        extend = tid_list.extend
+        for partition in partition_list:
+            extend(getReplicationTIDList(min_tid, max_tid, length,
+                partitions, partition))
+        conn.answer(Packets.AnswerTIDsFrom(tid_list))
+
     def askTIDs(self, conn, first, last, partition):
         # This method is complicated, because I must return TIDs only
         # about usable partitions assigned to me.

Modified: trunk/neo/storage/handlers/replication.py
==============================================================================
--- trunk/neo/storage/handlers/replication.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/replication.py [iso-8859-1] Wed Dec 15 18:23:37 2010
@@ -190,7 +190,7 @@ class ReplicationHandler(EventHandler):
         partition_id = replicator.getCurrentRID()
         max_tid = replicator.getCurrentCriticalTID()
         replicator.getTIDsFrom(min_tid, max_tid, length, partition_id)
-        return Packets.AskTIDsFrom(min_tid, max_tid, length, partition_id)
+        return Packets.AskTIDsFrom(min_tid, max_tid, length, [partition_id])
 
     def _doAskObjectHistoryFrom(self, min_oid, min_serial, length):
         replicator = self.app.replicator

Modified: trunk/neo/storage/handlers/storage.py
==============================================================================
--- trunk/neo/storage/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/storage.py [iso-8859-1] Wed Dec 15 18:23:37 2010
@@ -30,7 +30,9 @@ class StorageOperationHandler(BaseClient
         tid = app.dm.getLastTID()
         conn.answer(Packets.AnswerLastIDs(oid, tid, app.pt.getID()))
 
-    def askTIDsFrom(self, conn, min_tid, max_tid, length, partition):
+    def askTIDsFrom(self, conn, min_tid, max_tid, length, partition_list):
+        assert len(partition_list) == 1, partition_list
+        partition = partition_list[0]
         app = self.app
         tid_list = app.dm.getReplicationTIDList(min_tid, max_tid, length,
             app.pt.getPartitions(), partition)

Modified: trunk/neo/tests/storage/testReplicationHandler.py
==============================================================================
--- trunk/neo/tests/storage/testReplicationHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testReplicationHandler.py [iso-8859-1] Wed Dec 15 18:23:37 2010
@@ -426,10 +426,10 @@ class StorageReplicationHandlerTests(Neo
         self.assertEqual(pmin_tid, min_tid)
         self.assertEqual(pmax_tid, critical_tid)
         self.assertEqual(plength, length)
-        self.assertEqual(ppartition, rid)
+        self.assertEqual(ppartition, [rid])
         calls = app.replicator.mockGetNamedCalls('getTIDsFrom')
         self.assertEqual(len(calls), 1)
-        calls[0].checkArgs(pmin_tid, pmax_tid, plength, ppartition)
+        calls[0].checkArgs(pmin_tid, pmax_tid, plength, ppartition[0])
 
     def test_answerCheckTIDRangeDifferentSmallChunkWithoutNext(self):
         min_tid = self.getNextTID()
@@ -453,10 +453,10 @@ class StorageReplicationHandlerTests(Neo
         self.assertEqual(pmin_tid, min_tid)
         self.assertEqual(pmax_tid, critical_tid)
         self.assertEqual(plength, length - 1)
-        self.assertEqual(ppartition, rid)
+        self.assertEqual(ppartition, [rid])
         calls = app.replicator.mockGetNamedCalls('getTIDsFrom')
         self.assertEqual(len(calls), 1)
-        calls[0].checkArgs(pmin_tid, pmax_tid, plength, ppartition)
+        calls[0].checkArgs(pmin_tid, pmax_tid, plength, ppartition[0])
 
     # CheckSerialRange
     def test_answerCheckSerialFullRangeIdenticalChunkWithNext(self):

Modified: trunk/neo/tests/storage/testStorageHandler.py
==============================================================================
--- trunk/neo/tests/storage/testStorageHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testStorageHandler.py [iso-8859-1] Wed Dec 15 18:23:37 2010
@@ -119,7 +119,7 @@ class StorageStorageHandlerTests(NeoUnit
         self.app.pt = Mock({'getPartitions': 1})
         tid = self.getNextTID()
         tid2 = self.getNextTID()
-        self.operation.askTIDsFrom(conn, tid, tid2, 2, 1)
+        self.operation.askTIDsFrom(conn, tid, tid2, 2, [1])
         calls = self.app.dm.mockGetNamedCalls('getReplicationTIDList')
         self.assertEquals(len(calls), 1)
         calls[0].checkArgs(tid, tid2, 2, 1, 1)

Modified: trunk/neo/tests/testProtocol.py
==============================================================================
--- trunk/neo/tests/testProtocol.py [iso-8859-1] (original)
+++ trunk/neo/tests/testProtocol.py [iso-8859-1] Wed Dec 15 18:23:37 2010
@@ -591,12 +591,12 @@ class ProtocolTests(NeoUnitTestBase):
     def test_AskTIDsFrom(self):
         tid = self.getNextTID()
         tid2 = self.getNextTID()
-        p = Packets.AskTIDsFrom(tid, tid2, 1000, 5)
+        p = Packets.AskTIDsFrom(tid, tid2, 1000, [5])
         min_tid, max_tid, length, partition = p.decode()
         self.assertEqual(min_tid, tid)
         self.assertEqual(max_tid, tid2)
         self.assertEqual(length, 1000)
-        self.assertEqual(partition, 5)
+        self.assertEqual(partition, [5])
 
     def test_AnswerTIDsFrom(self):
         self._test_AnswerTIDs(Packets.AnswerTIDsFrom)




More information about the Neo-report mailing list