[Neo-report] r2287 vincent - in /trunk: neo/ neo/client/ neo/client/handlers/ neo/master/ ...

nobody at svn.erp5.org nobody at svn.erp5.org
Fri Sep 3 09:49:14 CEST 2010


Author: vincent
Date: Fri Sep  3 09:49:13 2010
New Revision: 2287

Log:
Implement distributed packing.

Modified:
    trunk/neo/client/Storage.py
    trunk/neo/client/app.py
    trunk/neo/client/handlers/master.py
    trunk/neo/client/iterator.py
    trunk/neo/handler.py
    trunk/neo/master/app.py
    trunk/neo/master/handlers/client.py
    trunk/neo/master/handlers/storage.py
    trunk/neo/protocol.py
    trunk/neo/storage/database/manager.py
    trunk/neo/storage/database/mysqldb.py
    trunk/neo/storage/handlers/master.py
    trunk/neo/storage/transactions.py
    trunk/neo/tests/__init__.py
    trunk/neo/tests/client/testClientApp.py
    trunk/neo/tests/client/testMasterHandler.py
    trunk/neo/tests/master/testClientHandler.py
    trunk/neo/tests/master/testStorageHandler.py
    trunk/neo/tests/storage/testMasterHandler.py
    trunk/neo/tests/storage/testTransactions.py
    trunk/neo/tests/testProtocol.py
    trunk/neo/tests/zodb/testPack.py
    trunk/tools/runner

Modified: trunk/neo/client/Storage.py
==============================================================================
--- trunk/neo/client/Storage.py [iso-8859-1] (original)
+++ trunk/neo/client/Storage.py [iso-8859-1] Fri Sep  3 09:49:13 2010
@@ -74,6 +74,11 @@ class Storage(BaseStorage.BaseStorage,
         return self.app.store(oid=oid, serial=serial,
             data=data, version=version, transaction=transaction)
 
+    @check_read_only
+    def deleteObject(oid, serial, transaction):
+        self.app.store(oid=oid, serial=serial, data='', version=None,
+            transaction=transaction)
+
     def getSerial(self, oid):
         try:
             return self.app.getSerial(oid = oid)
@@ -154,8 +159,11 @@ class Storage(BaseStorage.BaseStorage,
     def restore(self, oid, serial, data, version, prev_txn, transaction):
         raise NotImplementedError
 
-    def pack(self, t, referencesf):
-        raise NotImplementedError
+    def pack(self, t, referencesf, gc=False):
+        if gc:
+            logging.warning('Garbage Collection is not available in NEO, '
+                'please use an external tool. Packing without GC.')
+        self.app.pack(t)
 
     def lastSerial(self):
         # seems unused

Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Fri Sep  3 09:49:13 2010
@@ -24,12 +24,13 @@ import time
 
 from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
 from ZODB.ConflictResolution import ResolvedSerial
+from persistent.TimeStamp import TimeStamp
 
 from neo import setupLog
 setupLog('CLIENT', verbose=True)
 
 from neo import logging
-from neo.protocol import NodeTypes, Packets, INVALID_PARTITION
+from neo.protocol import NodeTypes, Packets, INVALID_PARTITION, ZERO_TID
 from neo.event import EventManager
 from neo.util import makeChecksum as real_makeChecksum, dump
 from neo.locking import Lock
@@ -1229,3 +1230,9 @@ class Application(object):
     def isTransactionVoted(self):
         return self.local_var.txn_voted
 
+    def pack(self, t):
+        tid = repr(TimeStamp(*time.gmtime(t)[:5] + (t % 60, )))
+        if tid == ZERO_TID:
+            raise NEOStorageError('Invalid pack time')
+        self._askPrimary(Packets.AskPack(tid))
+

Modified: trunk/neo/client/handlers/master.py
==============================================================================
--- trunk/neo/client/handlers/master.py [iso-8859-1] (original)
+++ trunk/neo/client/handlers/master.py [iso-8859-1] Fri Sep  3 09:49:13 2010
@@ -21,6 +21,7 @@ from neo.client.handlers import BaseHand
 from neo.pt import MTPartitionTable as PartitionTable
 from neo.protocol import NodeTypes, NodeStates, ProtocolError
 from neo.util import dump
+from neo.client.exception import NEOStorageError
 
 class PrimaryBootstrapHandler(AnswerBaseHandler):
     """ Bootstrap handler used when looking for the primary master """
@@ -170,3 +171,7 @@ class PrimaryAnswersHandler(AnswerBaseHa
             raise ProtocolError('Wrong TID, transaction not started')
         self.app.setTransactionFinished()
 
+    def answerPack(self, conn, status):
+        if not status:
+            raise NEOStorageError('Already packing')
+

Modified: trunk/neo/client/iterator.py
==============================================================================
--- trunk/neo/client/iterator.py [iso-8859-1] (original)
+++ trunk/neo/client/iterator.py [iso-8859-1] Fri Sep  3 09:49:13 2010
@@ -18,6 +18,7 @@
 from ZODB import BaseStorage
 from neo import util
 from neo.client.exception import NEOStorageCreationUndoneError
+from neo.client.exception import NEOStorageNotFoundError
 
 class Record(BaseStorage.DataRecord):
     """ TBaseStorageransaction record yielded by the Transaction object """
@@ -60,17 +61,27 @@ class Transaction(BaseStorage.Transactio
         app = self.app
         oid_list = self.oid_list
         oid_index = self.oid_index
-        if self.oid_index >= len(oid_list):
+        oid_len = len(oid_list)
+        # load an object
+        while oid_index < oid_len:
+            oid = oid_list[oid_index]
+            try:
+                data, _, next_tid = app._load(oid, serial=self.tid)
+            except NEOStorageCreationUndoneError:
+                data = next_tid = None
+            except NEOStorageNotFoundError:
+                # Transactions are not updated after a pack, so their object
+                # will not be found in the database. Skip them.
+                oid_list.pop(oid_index)
+                oid_len -= 1
+                continue
+            oid_index += 1
+            break
+        else:
             # no more records for this transaction
             self.oid_index = 0
             raise StopIteration
-        oid = oid_list[oid_index]
-        self.oid_index = oid_index + 1
-        # load an object
-        try:
-            data, _, next_tid = app._load(oid, serial=self.tid)
-        except NEOStorageCreationUndoneError:
-            data = next_tid = None
+        self.oid_index = oid_index
         record = Record(oid, self.tid, '', data,
             self.prev_serial_dict.get(oid))
         if next_tid is None:

Modified: trunk/neo/handler.py
==============================================================================
--- trunk/neo/handler.py [iso-8859-1] (original)
+++ trunk/neo/handler.py [iso-8859-1] Fri Sep  3 09:49:13 2010
@@ -353,6 +353,12 @@ class EventHandler(object):
     def answerBarrier(self, conn):
         pass
 
+    def askPack(self, conn, tid):
+        raise UnexpectedPacketError
+
+    def answerPack(self, conn, status):
+        raise UnexpectedPacketError
+
     # Error packet handlers.
 
     def error(self, conn, code, message):
@@ -468,6 +474,8 @@ class EventHandler(object):
         d[Packets.AnswerHasLock] = self.answerHasLock
         d[Packets.AskBarrier] = self.askBarrier
         d[Packets.AnswerBarrier] = self.answerBarrier
+        d[Packets.AskPack] = self.askPack
+        d[Packets.AnswerPack] = self.answerPack
 
         return d
 

Modified: trunk/neo/master/app.py
==============================================================================
--- trunk/neo/master/app.py [iso-8859-1] (original)
+++ trunk/neo/master/app.py [iso-8859-1] Fri Sep  3 09:49:13 2010
@@ -40,6 +40,7 @@ from neo.live_debug import register as r
 
 class Application(object):
     """The master node application."""
+    packing = None
 
     def __init__(self, config):
 

Modified: trunk/neo/master/handlers/client.py
==============================================================================
--- trunk/neo/master/handlers/client.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/client.py [iso-8859-1] Fri Sep  3 09:49:13 2010
@@ -88,3 +88,15 @@ class ClientServiceHandler(MasterHandler
         node = self.app.nm.getByUUID(conn.getUUID())
         app.tm.prepare(node, tid, oid_list, used_uuid_set, conn.getPeerId())
 
+    def askPack(self, conn, tid):
+        app = self.app
+        if app.packing is None:
+            storage_list = self.app.nm.getStorageList(only_identified=True)
+            app.packing = (conn, conn.getPeerId(),
+                set(x.getUUID() for x in storage_list))
+            p = Packets.AskPack(tid)
+            for storage in storage_list:
+                storage.getConnection().ask(p)
+        else:
+            conn.answer(Packets.AnswerPack(False))
+

Modified: trunk/neo/master/handlers/storage.py
==============================================================================
--- trunk/neo/master/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/storage.py [iso-8859-1] Fri Sep  3 09:49:13 2010
@@ -22,6 +22,7 @@ from neo.protocol import CellStates, Pac
 from neo.master.handlers import BaseServiceHandler
 from neo.exception import OperationFailure
 from neo.util import dump
+from neo.connector import ConnectorConnectionClosedException
 
 
 class StorageServiceHandler(BaseServiceHandler):
@@ -46,6 +47,9 @@ class StorageServiceHandler(BaseServiceH
             # if a transaction is known, this means that it's being committed
             if transaction.forget(uuid):
                 self._afterLock(tid)
+        packing = self.app.packing
+        if packing is not None:
+            self.answerPack(conn, False)
 
     def askLastIDs(self, conn):
         app = self.app
@@ -124,4 +128,15 @@ class StorageServiceHandler(BaseServiceH
                 break
         self.app.broadcastPartitionChanges(cell_list)
 
+    def answerPack(self, conn, status):
+        app = self.app
+        if app.packing is not None:
+            client, msg_id, uid_set = app.packing
+            uid_set.remove(conn.getUUID())
+            if not uid_set:
+                app.packing = None
+                try:
+                    client.answer(Packets.AnswerPack(True), msg_id=msg_id)
+                except ConnectorConnectionClosedException:
+                    pass
 

Modified: trunk/neo/protocol.py
==============================================================================
--- trunk/neo/protocol.py [iso-8859-1] (original)
+++ trunk/neo/protocol.py [iso-8859-1] Fri Sep  3 09:49:13 2010
@@ -1634,6 +1634,32 @@ class AskBarrier(Packet):
 class AnswerBarrier(Packet):
     pass
 
+class AskPack(Packet):
+    """
+    Request a pack at given TID.
+    C -> M
+    M -> S
+    """
+    def _encode(self, tid):
+        return _encodeTID(tid)
+
+    def _decode(self, body):
+        return (_decodeTID(body), )
+
+class AnswerPack(Packet):
+    """
+    Inform that packing it over.
+    S -> M
+    M -> C
+    """
+    _header_format = '!H'
+
+    def _encode(self, status):
+        return pack(self._header_format, int(status))
+
+    def _decode(self, body):
+        return (bool(unpack(self._header_format, body)[0]), )
+
 class Error(Packet):
     """
     Error is a special type of message, because this can be sent against
@@ -1873,6 +1899,10 @@ class PacketRegistry(dict):
             0x037,
             AskBarrier,
             AnswerBarrier)
+    AskPack, AnswerPack = register(
+            0x0038,
+            AskPack,
+            AnswerPack)
 
 # build a "singleton"
 Packets = PacketRegistry()

Modified: trunk/neo/storage/database/manager.py
==============================================================================
--- trunk/neo/storage/database/manager.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/manager.py [iso-8859-1] Fri Sep  3 09:49:13 2010
@@ -323,3 +323,19 @@ class DatabaseManager(object):
         the given list."""
         raise NotImplementedError
 
+    def pack(self, tid, updateObjectDataForPack):
+        """Prune all non-current object revisions at given tid.
+        updateObjectDataForPack is a function called for each deleted object
+        and revision with:
+        - OID
+        - packed TID
+        - new value_serial
+            If object data was moved to an after-pack-tid revision, this
+            parameter contains the TID of that revision, allowing to backlink
+            to it.
+        - getObjectData function
+            To call if value_serial is None and an object needs to be updated.
+            Takes no parameter, returns a 3-tuple: compression, checksum,
+            value
+        """
+

Modified: trunk/neo/storage/database/mysqldb.py
==============================================================================
--- trunk/neo/storage/database/mysqldb.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/mysqldb.py [iso-8859-1] Fri Sep  3 09:49:13 2010
@@ -206,6 +206,16 @@ class MySQLDatabaseManager(DatabaseManag
             value = "'%s'" % (e(str(value)), )
         q("""REPLACE INTO config VALUES ('%s', %s)""" % (key, value))
 
+    def _setPackTID(self, tid):
+        self._setConfiguration('_pack_tid', tid)
+
+    def _getPackTID(self):
+        try:
+            result = int(self.getConfiguration('_pack_tid'))
+        except KeyError:
+            result = -1
+        return result
+
     def getPartitionTable(self):
         q = self.query
         cell_list = q("""SELECT rid, uuid, state FROM pt""")
@@ -618,9 +628,11 @@ class MySQLDatabaseManager(DatabaseManag
         q = self.query
         oid = util.u64(oid)
         p64 = util.p64
+        pack_tid = self._getPackTID()
         r = q("""SELECT serial, LENGTH(value), value_serial FROM obj
-                    WHERE oid = %d ORDER BY serial DESC LIMIT %d, %d""" \
-                % (oid, offset, length))
+                    WHERE oid = %d AND serial >= %d
+                    ORDER BY serial DESC LIMIT %d, %d""" \
+                % (oid, pack_tid, offset, length))
         if r:
             result = []
             append = result.append
@@ -683,3 +695,92 @@ class MySQLDatabaseManager(DatabaseManag
                 % (oid, ','.join([str(util.u64(serial)) for serial in serial_list])))
         return [util.p64(t[0]) for t in r]
 
+    def _updatePackFuture(self, oid, orig_serial, max_serial,
+            updateObjectDataForPack):
+        q = self.query
+        p64 = util.p64
+        # Before deleting this objects revision, see if there is any
+        # transaction referencing its value at max_serial or above.
+        # If there is, copy value to the first future transaction. Any further
+        # reference is just updated to point to the new data location.
+        value_serial = None
+        for table in ('obj', 'tobj'):
+            for (serial, ) in q('SELECT serial FROM %(table)s WHERE '
+                    'oid = %(oid)d AND serial >= %(max_serial)d AND '
+                    'value_serial = %(orig_serial)d ORDER BY serial ASC' % {
+                        'table': table,
+                        'oid': oid,
+                        'orig_serial': orig_serial,
+                        'max_serial': max_serial,
+                    }):
+                if value_serial is None:
+                    # First found, copy data to it and mark its serial for
+                    # future reference.
+                    value_serial = serial
+                    q('REPLACE INTO %(table)s (oid, serial, compression, '
+                        'checksum, value, value_serial) SELECT oid, '
+                        '%(serial)d, compression, checksum, value, NULL FROM '
+                        'obj WHERE oid = %(oid)d AND serial = %(orig_serial)d' \
+                        % {
+                            'table': table,
+                            'oid': oid,
+                            'serial': serial,
+                            'orig_serial': orig_serial,
+                    })
+                else:
+                    q('REPLACE INTO %(table)s (oid, serial, value_serial) '
+                        'VALUES (%(oid)d, %(serial)d, %(value_serial)d)' % {
+                            'table': table,
+                            'oid': oid,
+                            'serial': serial,
+                            'value_serial': value_serial,
+                    })
+        def getObjectData():
+            assert value_serial is None
+            return q('SELECT compression, checksum, value FROM obj WHERE '
+                'oid = %(oid)d AND serial = %(orig_serial)d' % {
+                    'oid': oid,
+                    'orig_serial': orig_serial,
+                })[0]
+        if value_serial:
+            value_serial = p64(value_serial)
+        updateObjectDataForPack(p64(oid), p64(orig_serial), value_serial,
+            getObjectData)
+
+    def pack(self, tid, updateObjectDataForPack):
+        # TODO: unit test (along with updatePackFuture)
+        q = self.query
+        tid = util.u64(tid)
+        updatePackFuture = self._updatePackFuture
+        self.begin()
+        try:
+            self._setPackTID(tid)
+            for count, oid, max_serial in q('SELECT COUNT(*) - 1, oid, '
+                    'MAX(serial) FROM obj WHERE serial <= %(tid)d '
+                    'GROUP BY oid' % {'tid': tid}):
+                if q('SELECT LENGTH(value) FROM obj WHERE oid = %(oid)d AND '
+                        'serial = %(max_serial)d' % {
+                            'oid': oid,
+                            'max_serial': max_serial,
+                        })[0][0] == 0:
+                    count += 1
+                    max_serial += 1
+                if count:
+                    # There are things to delete for this object
+                    for (serial, ) in q('SELECT serial FROM obj WHERE '
+                            'oid=%(oid)d AND serial < %(max_serial)d' % {
+                                'oid': oid,
+                                'max_serial': max_serial,
+                            }):
+                        updatePackFuture(oid, serial, max_serial,
+                            updateObjectDataForPack)
+                        q('DELETE FROM obj WHERE oid=%(oid)d AND '
+                            'serial=%(serial)d' % {
+                                'oid': oid,
+                                'serial': serial
+                        })
+        except:
+            self.rollback()
+            raise
+        self.commit()
+

Modified: trunk/neo/storage/handlers/master.py
==============================================================================
--- trunk/neo/storage/handlers/master.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/master.py [iso-8859-1] Fri Sep  3 09:49:13 2010
@@ -64,3 +64,11 @@ class MasterOperationHandler(BaseMasterH
             raise ProtocolError('Unknown transaction')
         # TODO: send an answer
         self.app.tm.unlock(tid)
+
+    def askPack(self, conn, tid):
+        app = self.app
+        logging.info('Pack started, up to %s...', dump(tid))
+        app.dm.pack(tid, app.tm.updateObjectDataForPack)
+        logging.info('Pack finished.')
+        conn.answer(Packets.AnswerPack(True))
+

Modified: trunk/neo/storage/transactions.py
==============================================================================
--- trunk/neo/storage/transactions.py [iso-8859-1] (original)
+++ trunk/neo/storage/transactions.py [iso-8859-1] Fri Sep  3 09:49:13 2010
@@ -285,3 +285,19 @@ class TransactionManager(object):
         for oid, tid in self._store_lock_dict.items():
             logging.info('    %r by %r', dump(oid), dump(tid))
 
+    def updateObjectDataForPack(self, oid, orig_serial, new_serial,
+            getObjectData):
+        lock_tid = self.getLockingTID(oid)
+        if lock_tid is not None:
+            transaction = self._transaction_dict[lock_tid]
+            oid, compression, checksum, data, value_serial = \
+                transaction.getObject(oid)
+            if value_serial == orig_serial:
+                if new_serial:
+                    value_serial = new_serial
+                else:
+                    compression, checksum, data = getObjectData()
+                    value_serial = None
+                transaction.addObject(oid, compression, checksum, data,
+                    value_serial)
+

Modified: trunk/neo/tests/__init__.py
==============================================================================
--- trunk/neo/tests/__init__.py [iso-8859-1] (original)
+++ trunk/neo/tests/__init__.py [iso-8859-1] Fri Sep  3 09:49:13 2010
@@ -156,7 +156,7 @@ class NeoTestBase(unittest.TestCase):
         })
 
     def getFakeConnection(self, uuid=None, address=('127.0.0.1', 10000),
-            is_server=False, connector=None):
+            is_server=False, connector=None, peer_id=None):
         if connector is None:
             connector = self.getFakeConnector()
         return Mock({
@@ -166,6 +166,7 @@ class NeoTestBase(unittest.TestCase):
              '__repr__': 'FakeConnection',
             '__nonzero__': 0,
             'getConnector': connector,
+            'getPeerId': peer_id,
         })
 
     def checkProtocolErrorRaised(self, method, *args, **kwargs):

Modified: trunk/neo/tests/client/testClientApp.py
==============================================================================
--- trunk/neo/tests/client/testClientApp.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testClientApp.py [iso-8859-1] Fri Sep  3 09:49:13 2010
@@ -25,6 +25,7 @@ from neo.client.exception import NEOStor
 from neo.client.exception import NEOStorageDoesNotExistError
 from neo.protocol import Packet, Packets, Errors, INVALID_TID, INVALID_SERIAL
 from neo.util import makeChecksum
+import time
 
 def _getMasterConnection(self):
     if self.master_conn is None:
@@ -1164,6 +1165,21 @@ class ClientApplicationTests(NeoTestBase
         self.assertTrue(hasattr(app1_local, property_id))
         self.assertFalse(hasattr(app2_local, property_id))
 
+    def test_pack(self):
+        app = self.getApp()
+        marker = []
+        def askPrimary(packet):
+            marker.append(packet)
+        app._askPrimary = askPrimary
+        # XXX: could not identify a value causing TimeStamp to return ZERO_TID
+        #self.assertRaises(NEOStorageError, app.pack, )
+        self.assertEqual(len(marker), 0)
+        now = time.time()
+        app.pack(now)
+        self.assertEqual(len(marker), 1)
+        self.assertEqual(marker[0].getType(), Packets.AskPack)
+        # XXX: how to validate packet content ?
+
 if __name__ == '__main__':
     unittest.main()
 

Modified: trunk/neo/tests/client/testMasterHandler.py
==============================================================================
--- trunk/neo/tests/client/testMasterHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testMasterHandler.py [iso-8859-1] Fri Sep  3 09:49:13 2010
@@ -23,6 +23,7 @@ from neo.protocol import NodeTypes, Node
 from neo.client.handlers.master import PrimaryBootstrapHandler
 from neo.client.handlers.master import PrimaryNotificationsHandler, \
        PrimaryAnswersHandler
+from neo.client.exception import NEOStorageError
 
 MARKER = []
 
@@ -255,6 +256,10 @@ class MasterAnswersHandlerTests(MasterHa
         calls = app.mockGetNamedCalls('setTransactionFinished')
         self.assertEqual(len(calls), 1)
         
+    def test_answerPack(self):
+        self.assertRaises(NEOStorageError, self.handler.answerPack, None, False)
+        # Check it doesn't raise
+        self.handler.answerPack(None, True)
 
 if __name__ == '__main__':
     unittest.main()

Modified: trunk/neo/tests/master/testClientHandler.py
==============================================================================
--- trunk/neo/tests/master/testClientHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testClientHandler.py [iso-8859-1] Fri Sep  3 09:49:13 2010
@@ -19,7 +19,7 @@ import unittest
 from mock import Mock
 from struct import pack, unpack
 from neo.tests import NeoTestBase
-from neo.protocol import NodeTypes, NodeStates
+from neo.protocol import NodeTypes, NodeStates, Packets
 from neo.master.handlers.client import ClientServiceHandler
 from neo.master.app import Application
 
@@ -154,6 +154,34 @@ class MasterClientHandlerTests(NeoTestBa
         self.__testWithMethod(self.service.connectionClosed,
             NodeStates.TEMPORARILY_DOWN)
 
+    def test_askPack(self):
+        self.assertEqual(self.app.packing, None)
+        self.app.nm.createClient()
+        tid = self.getNextTID()
+        peer_id = 42
+        conn = self.getFakeConnection(peer_id=peer_id)
+        storage_uuid = self.identifyToMasterNode()
+        storage_conn = self.getFakeConnection(storage_uuid,
+            self.storage_address)
+        self.app.nm.getByUUID(storage_uuid).setConnection(storage_conn)
+        self.service.askPack(conn, tid)
+        self.checkNoPacketSent(conn)
+        ptid = self.checkAskPacket(storage_conn, Packets.AskPack,
+            decode=True)[0]
+        self.assertEqual(ptid, tid)
+        self.assertTrue(self.app.packing[0] is conn)
+        self.assertEqual(self.app.packing[1], peer_id)
+        self.assertEqual(self.app.packing[2], set([storage_uuid, ]))
+        # Asking again to pack will cause an immediate error
+        storage_uuid = self.identifyToMasterNode()
+        storage_conn = self.getFakeConnection(storage_uuid,
+            self.storage_address)
+        self.app.nm.getByUUID(storage_uuid).setConnection(storage_conn)
+        self.service.askPack(conn, tid)
+        self.checkNoPacketSent(storage_conn)
+        status = self.checkAnswerPacket(conn, Packets.AnswerPack,
+            decode=True)[0]
+        self.assertFalse(status)
 
 if __name__ == '__main__':
     unittest.main()

Modified: trunk/neo/tests/master/testStorageHandler.py
==============================================================================
--- trunk/neo/tests/master/testStorageHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testStorageHandler.py [iso-8859-1] Fri Sep  3 09:49:13 2010
@@ -19,7 +19,7 @@ import unittest
 from mock import Mock
 from struct import pack
 from neo.tests import NeoTestBase
-from neo.protocol import NodeTypes, NodeStates
+from neo.protocol import NodeTypes, NodeStates, Packets
 from neo.master.handlers.storage import StorageServiceHandler
 from neo.master.handlers.client import ClientServiceHandler
 from neo.master.app import Application
@@ -247,6 +247,30 @@ class MasterStorageHandlerTests(NeoTestB
         # T3: action not significant to this transacion, so no response
         self.checkNoPacketSent(cconn3, check_notify=False)
 
+    def test_answerPack(self):
+        # Note: incomming status has no meaning here, so it's left to False.
+        node1, conn1 = self._getStorage()
+        node2, conn2 = self._getStorage()
+        self.app.packing = None
+        # Does nothing
+        self.service.answerPack(None, False)
+
+        client_conn = Mock({
+            'getPeerId': 512,
+        })
+        client_peer_id = 42
+        self.app.packing = (client_conn, client_peer_id, set([conn1.getUUID(),
+            conn2.getUUID()]))
+        self.service.answerPack(conn1, False)
+        self.checkNoPacketSent(client_conn)
+        self.assertEqual(self.app.packing[2], set([conn2.getUUID(), ]))
+        self.service.answerPack(conn2, False)
+        status = self.checkAnswerPacket(client_conn, Packets.AnswerPack,
+            decode=True)[0]
+        # TODO: verify packet peer id
+        self.assertTrue(status)
+        self.assertEqual(self.app.packing, None)
+
 if __name__ == '__main__':
     unittest.main()
 

Modified: trunk/neo/tests/storage/testMasterHandler.py
==============================================================================
--- trunk/neo/tests/storage/testMasterHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testMasterHandler.py [iso-8859-1] Fri Sep  3 09:49:13 2010
@@ -23,7 +23,7 @@ from neo.storage.app import Application
 from neo.storage.handlers.master import MasterOperationHandler
 from neo.exception import PrimaryFailure, OperationFailure
 from neo.pt import PartitionTable
-from neo.protocol import CellStates, ProtocolError
+from neo.protocol import CellStates, ProtocolError, Packets
 from neo.protocol import INVALID_TID, INVALID_OID
 
 class StorageMasterHandlerTests(NeoTestBase):
@@ -196,5 +196,16 @@ class StorageMasterHandlerTests(NeoTestB
         self.assertEquals(len(calls), 1)
         calls[0].checkArgs((INVALID_TID, ))
 
+    def test_askPack(self):
+        self.app.dm = Mock({'pack': None})
+        conn = self.getFakeConnection()
+        tid = self.getNextTID()
+        self.operation.askPack(conn, tid)
+        calls = self.app.dm.mockGetNamedCalls('pack')
+        self.assertEqual(len(calls), 1)
+        calls[0].checkArgs(tid, self.app.tm.updateObjectDataForPack)
+        # Content has no meaning here, don't check.
+        self.checkAnswerPacket(conn, Packets.AnswerPack)
+
 if __name__ == "__main__":
     unittest.main()

Modified: trunk/neo/tests/storage/testTransactions.py
==============================================================================
--- trunk/neo/tests/storage/testTransactions.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testTransactions.py [iso-8859-1] Fri Sep  3 09:49:13 2010
@@ -334,5 +334,57 @@ class TransactionManagerTests(NeoTestBas
         self.manager.storeObject(tid1, serial1, *obj1)
         self.assertEqual(self.manager.getLockingTID(oid1), tid1)
 
+    def test_updateObjectDataForPack(self):
+        ram_serial = self.getNextTID()
+        oid = self.getOID(1)
+        orig_serial = self.getNextTID()
+        uuid = self.getNewUUID()
+        locking_serial = self.getNextTID()
+        other_serial = self.getNextTID()
+        new_serial = self.getNextTID()
+        compression = 1
+        checksum = 42
+        value = 'foo'
+        self.manager.register(uuid, locking_serial)
+        def getObjectData():
+            return (compression, checksum, value)
+        # Object not known, nothing happens
+        self.assertEqual(self.manager.getObjectFromTransaction(locking_serial,
+            oid), None)
+        self.manager.updateObjectDataForPack(oid, orig_serial, None, None)
+        self.assertEqual(self.manager.getObjectFromTransaction(locking_serial,
+            oid), None)
+        # Object known, but doesn't point at orig_serial, it is not updated
+        self.manager.storeObject(locking_serial, ram_serial, oid, 0, 512,
+            'bar', None)
+        orig_object = self.manager.getObjectFromTransaction(locking_serial,
+            oid)
+        self.manager.updateObjectDataForPack(oid, orig_serial, None, None)
+        self.assertEqual(self.manager.getObjectFromTransaction(locking_serial,
+            oid), orig_object)
+
+        self.manager.storeObject(locking_serial, ram_serial, oid, None, None,
+            None, other_serial)
+        orig_object = self.manager.getObjectFromTransaction(locking_serial,
+            oid)
+        self.manager.updateObjectDataForPack(oid, orig_serial, None, None)
+        self.assertEqual(self.manager.getObjectFromTransaction(locking_serial,
+            oid), orig_object)
+        # Object known and points at undone data it gets updated
+        # ...with data_serial: getObjectData must not be called
+        self.manager.storeObject(locking_serial, ram_serial, oid, None, None,
+            None, orig_serial)
+        self.manager.updateObjectDataForPack(oid, orig_serial, new_serial,
+            None)
+        self.assertEqual(self.manager.getObjectFromTransaction(locking_serial,
+            oid), (oid, None, None, None, new_serial))
+        # with data
+        self.manager.storeObject(locking_serial, ram_serial, oid, None, None,
+            None, orig_serial)
+        self.manager.updateObjectDataForPack(oid, orig_serial, None,
+            getObjectData)
+        self.assertEqual(self.manager.getObjectFromTransaction(locking_serial,
+            oid), (oid, compression, checksum, value, None))
+
 if __name__ == "__main__":
     unittest.main()

Modified: trunk/neo/tests/testProtocol.py
==============================================================================
--- trunk/neo/tests/testProtocol.py [iso-8859-1] (original)
+++ trunk/neo/tests/testProtocol.py [iso-8859-1] Fri Sep  3 09:49:13 2010
@@ -635,6 +635,18 @@ class ProtocolTests(NeoTestBase):
     def test_AnswerObjectHistoryFrom(self):
         self._testXIDAndYIDList(Packets.AnswerObjectHistoryFrom)
 
+    def test_AskPack(self):
+        tid = self.getNextTID()
+        p = Packets.AskPack(tid)
+        ptid = p.decode()[0]
+        self.assertEqual(ptid, tid)
+
+    def test_AnswerPack(self):
+        status = True
+        p = Packets.AnswerPack(status)
+        pstatus = p.decode()[0]
+        self.assertEqual(pstatus, status)
+
 if __name__ == '__main__':
     unittest.main()
 

Modified: trunk/neo/tests/zodb/testPack.py
==============================================================================
--- trunk/neo/tests/zodb/testPack.py [iso-8859-1] (original)
+++ trunk/neo/tests/zodb/testPack.py [iso-8859-1] Fri Sep  3 09:49:13 2010
@@ -17,14 +17,18 @@
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 
 import unittest
-from ZODB.tests.PackableStorage import PackableStorage
+try:
+    from ZODB.tests.PackableStorage import PackableStorageWithOptionalGC
+except ImportError:
+    from ZODB.tests.PackableStorage import PackableStorage as \
+        PackableStorageWithOptionalGC
 from ZODB.tests.PackableStorage import PackableUndoStorage
 from ZODB.tests.StorageTestBase import StorageTestBase
 
 from neo.tests.zodb import ZODBTestCase
 
-class PackableTests(ZODBTestCase, StorageTestBase, PackableStorage,
-        PackableUndoStorage):
+class PackableTests(ZODBTestCase, StorageTestBase,
+        PackableStorageWithOptionalGC, PackableUndoStorage):
     pass
 
 if __name__ == "__main__":

Modified: trunk/tools/runner
==============================================================================
--- trunk/tools/runner [iso-8859-1] (original)
+++ trunk/tools/runner [iso-8859-1] Fri Sep  3 09:49:13 2010
@@ -76,7 +76,7 @@ ZODB_TEST_MODULES = [
     ('neo.tests.zodb.testHistory', 'check'),
     ('neo.tests.zodb.testIterator', 'check'),
     ('neo.tests.zodb.testMT', 'check'),
-    # ('neo.tests.zodb.testPack', 'check'),
+    ('neo.tests.zodb.testPack', 'check'),
     ('neo.tests.zodb.testPersistent', 'check'),
     ('neo.tests.zodb.testReadOnly', 'check'),
     ('neo.tests.zodb.testRevision', 'check'),





More information about the Neo-report mailing list