[Neo-report] r2180 gregory - in /trunk/neo: master/ master/handlers/ storage/ storage/hand...

nobody at svn.erp5.org nobody at svn.erp5.org
Mon Jun 21 17:53:51 CEST 2010


Author: gregory
Date: Mon Jun 21 17:53:41 2010
New Revision: 2180

Log:
Move stored OIDs check to master side.

- The storages no more check the last OID during a store
- The storages inconditionnaly store the last OID notified by the master
- The master check during the if a greater oid was used by a client
- The master always notify the last OID when a pool is generated or if the
check above is True
- The master's transaction manager manager the last oid and oid generator

Modified:
    trunk/neo/master/app.py
    trunk/neo/master/handlers/client.py
    trunk/neo/master/handlers/storage.py
    trunk/neo/master/recovery.py
    trunk/neo/master/transactions.py
    trunk/neo/storage/handlers/__init__.py
    trunk/neo/storage/handlers/initialization.py
    trunk/neo/storage/transactions.py
    trunk/neo/tests/__init__.py
    trunk/neo/tests/functional/testClient.py
    trunk/neo/tests/master/testClientHandler.py
    trunk/neo/tests/master/testMasterApp.py
    trunk/neo/tests/master/testRecovery.py
    trunk/neo/tests/master/testStorageHandler.py
    trunk/neo/tests/master/testTransactions.py

Modified: trunk/neo/master/app.py
==============================================================================
--- trunk/neo/master/app.py [iso-8859-1] (original)
+++ trunk/neo/master/app.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -18,7 +18,6 @@
 from neo import logging
 import os, sys
 from time import time
-from struct import pack, unpack
 
 from neo import protocol
 from neo.protocol import UUID_NAMESPACES
@@ -84,9 +83,6 @@ class Application(object):
             uuid = self.getNewUUID(NodeTypes.MASTER)
         self.uuid = uuid
 
-        # The last OID.
-        self.loid = None
-
         # election related data
         self.unconnected_master_node_set = set()
         self.negotiating_master_node_set = set()
@@ -303,7 +299,8 @@ class Application(object):
         " Outdate cell of non-working nodes and broadcast changes """
         self.broadcastPartitionChanges(self.pt.outdate())
 
-    def broadcastLastOID(self, oid):
+    def broadcastLastOID(self):
+        oid = self.tm.getLastOID()
         logging.debug('Broadcast last OID to storages : %s' % dump(oid))
         packet = Packets.NotifyLastOID(oid)
         for node in self.nm.getIdentifiedList():
@@ -456,15 +453,6 @@ class Application(object):
             handler.connectionCompleted(conn)
         self.cluster_state = state
 
-    def getNewOIDList(self, num_oids):
-        if self.loid is None:
-            raise RuntimeError, 'I do not know the last OID'
-        oid = unpack('!Q', self.loid)[0] + 1
-        oid_list = [pack('!Q', oid + i) for i in xrange(num_oids)]
-        self.loid = oid_list[-1]
-        self.broadcastLastOID(self.loid)
-        return oid_list
-
     def getNewUUID(self, node_type):
         # build an UUID
         uuid = os.urandom(15)

Modified: trunk/neo/master/handlers/client.py
==============================================================================
--- trunk/neo/master/handlers/client.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/client.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -56,8 +56,8 @@ class ClientServiceHandler(MasterHandler
         conn.answer(Packets.AnswerBeginTransaction(tid))
 
     def askNewOIDs(self, conn, num_oids):
-        oid_list = self.app.getNewOIDList(num_oids)
-        conn.answer(Packets.AnswerNewOIDs(oid_list))
+        conn.answer(Packets.AnswerNewOIDs(self.app.tm.getNextOIDList(num_oids)))
+        self.app.broadcastLastOID()
 
     def askFinishTransaction(self, conn, tid, oid_list):
         app = self.app
@@ -78,6 +78,10 @@ class ClientServiceHandler(MasterHandler
             uuid_set.update((cell.getUUID() for cell in app.pt.getCellList(part)
                              if cell.getNodeState() != NodeStates.HIDDEN))
 
+        # check if greater and foreign OID was stored
+        if self.app.tm.updateLastOID(oid_list):
+            self.app.broadcastLastOID()
+
         # Request locking data.
         # build a new set as we may not send the message to all nodes as some
         # might be not reachable at that time

Modified: trunk/neo/master/handlers/storage.py
==============================================================================
--- trunk/neo/master/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/storage.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -48,8 +48,9 @@ class StorageServiceHandler(BaseServiceH
 
     def askLastIDs(self, conn):
         app = self.app
-        conn.answer(Packets.AnswerLastIDs(app.loid, app.tm.getLastTID(),
-                    app.pt.getID()))
+        loid = app.tm.getLastOID()
+        ltid = app.tm.getLastTID()
+        conn.answer(Packets.AnswerLastIDs(loid, ltid, app.pt.getID()))
 
     def askUnfinishedTransactions(self, conn):
         p = Packets.AnswerUnfinishedTransactions(self.app.tm.getPendingList())

Modified: trunk/neo/master/recovery.py
==============================================================================
--- trunk/neo/master/recovery.py [iso-8859-1] (original)
+++ trunk/neo/master/recovery.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -59,7 +59,7 @@ class RecoveryManager(MasterHandler):
         self.app.changeClusterState(ClusterStates.RECOVERING)
         em = self.app.em
 
-        self.app.loid = None
+        self.app.tm.setLastOID(None)
         self.app.pt.setID(None)
 
         # collect the last partition table available
@@ -81,7 +81,7 @@ class RecoveryManager(MasterHandler):
         self.app.broadcastNodesInformation(refused_node_set)
 
         logging.debug('cluster starts with loid=%s and this partition table :',
-                dump(self.app.loid))
+                dump(self.app.tm.getLastOID()))
         self.app.pt.log()
 
     def buildFromScratch(self):
@@ -96,7 +96,7 @@ class RecoveryManager(MasterHandler):
             node.setRunning()
         self.app.broadcastNodesInformation(node_list)
         # resert IDs generators
-        self.app.loid = '\0' * 8
+        self.app.tm.setLastOID('\0' * 8)
         # build the partition with this node
         pt.setID(pack('!Q', 1))
         pt.make(node_list)
@@ -115,13 +115,9 @@ class RecoveryManager(MasterHandler):
             conn.ask(Packets.AskLastIDs())
 
     def answerLastIDs(self, conn, loid, ltid, lptid):
-        app = self.app
         # Get max values.
         if loid is not None:
-            if app.loid is None:
-                app.loid = loid
-            else:
-                app.loid = max(loid, app.loid)
+            self.app.tm.setLastOID(max(loid, self.app.tm.getLastOID()))
         if ltid is not None:
             self.app.tm.setLastTID(ltid)
         if lptid > self.target_ptid:
@@ -130,7 +126,6 @@ class RecoveryManager(MasterHandler):
             conn.ask(Packets.AskPartitionTable())
 
     def answerPartitionTable(self, conn, ptid, row_list):
-        app = self.app
         if ptid != self.target_ptid:
             # If this is not from a target node, ignore it.
             logging.warn('Got %s while waiting %s', dump(ptid),

Modified: trunk/neo/master/transactions.py
==============================================================================
--- trunk/neo/master/transactions.py [iso-8859-1] (original)
+++ trunk/neo/master/transactions.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -19,7 +19,6 @@ from time import time, gmtime
 from struct import pack, unpack
 from neo.util import dump
 from neo import logging
-from neo import protocol
 
 class Transaction(object):
     """
@@ -127,6 +126,7 @@ class TransactionManager(object):
         # node -> transactions mapping
         self._node_dict = {}
         self._last_tid = None
+        self._last_oid = None
 
     def __getitem__(self, tid):
         """
@@ -143,6 +143,32 @@ class TransactionManager(object):
     def items(self):
         return self._tid_dict.items()
 
+    def getNextOIDList(self, num_oids):
+        """ Generate a new OID list """
+        if self._last_oid is None:
+            raise RuntimeError, 'I do not know the last OID'
+        oid = unpack('!Q', self._last_oid)[0] + 1
+        oid_list = [pack('!Q', oid + i) for i in xrange(num_oids)]
+        self._last_oid = oid_list[-1]
+        return oid_list
+
+    def updateLastOID(self, oid_list):
+        """
+            Updates the last oid with the max of those supplied if greater than
+            the current known, returns True if changed
+        """
+        max_oid = oid_list and max(oid_list) or None # oid_list might be empty
+        if max_oid > self._last_oid:
+            self._last_oid = max_oid
+            return True
+        return False
+
+    def setLastOID(self, oid):
+        self._last_oid = oid
+
+    def getLastOID(self):
+        return self._last_oid
+
     def _nextTID(self):
         """ Compute the next TID based on the current time and check collisions """
         tm = time()

Modified: trunk/neo/storage/handlers/__init__.py
==============================================================================
--- trunk/neo/storage/handlers/__init__.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/__init__.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -39,7 +39,6 @@ class BaseMasterHandler(EventHandler):
                 self.__class__.__name__)
 
     def notifyLastOID(self, conn, oid):
-        self.app.tm.setLastOID(oid)
         self.app.dm.setLastOID(oid)
 
     def notifyNodeInformation(self, conn, node_list):

Modified: trunk/neo/storage/handlers/initialization.py
==============================================================================
--- trunk/neo/storage/handlers/initialization.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/initialization.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -56,7 +56,6 @@ class InitializationHandler(BaseMasterHa
         self.app.has_partition_table = True
 
     def answerLastIDs(self, conn, loid, ltid, lptid):
-        self.app.tm.setLastOID(loid)
         self.app.dm.setLastOID(loid)
         self.app.has_last_ids = True
 

Modified: trunk/neo/storage/transactions.py
==============================================================================
--- trunk/neo/storage/transactions.py [iso-8859-1] (original)
+++ trunk/neo/storage/transactions.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -114,8 +114,6 @@ class TransactionManager(object):
         self._store_lock_dict = {}
         self._load_lock_dict = {}
         self._uuid_dict = {}
-        self._loid = None
-        self._loid_seen = None
 
     def __contains__(self, tid):
         """
@@ -144,10 +142,6 @@ class TransactionManager(object):
             result = result.getObject(oid)
         return result
 
-    def setLastOID(self, oid):
-        assert oid >= self._loid
-        self._loid = oid
-
     def reset(self):
         """
             Reset the transaction manager
@@ -186,13 +180,6 @@ class TransactionManager(object):
         self._app.dm.finishTransaction(tid)
         self.abort(tid, even_if_locked=True)
 
-        # update loid if needed
-        if self._loid_seen > self._loid:
-            args = dump(self._loid_seen), dump(self._loid)
-            logging.warning('Greater OID used in StoreObject : %s > %s', *args)
-            self._loid = self._loid_seen
-            self._app.dm.setLastOID(self._loid)
-
     def storeTransaction(self, tid, oid_list, user, desc, ext, packed):
         """
             Store transaction information received from client node
@@ -240,9 +227,6 @@ class TransactionManager(object):
         transaction = self._transaction_dict[tid]
         transaction.addObject(oid, compression, checksum, data, value_serial)
 
-        # update loid
-        self._loid_seen = oid
-
     def abort(self, tid, even_if_locked=True):
         """
             Abort a transaction

Modified: trunk/neo/tests/__init__.py
==============================================================================
--- trunk/neo/tests/__init__.py [iso-8859-1] (original)
+++ trunk/neo/tests/__init__.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -295,6 +295,9 @@ class NeoTestBase(unittest.TestCase):
     def checkAbortTransaction(self, conn, **kw):
         return self.checkNotifyPacket(conn, Packets.AbortTransaction, **kw)
 
+    def checkNotifyLastOID(self, conn, **kw):
+        return self.checkNotifyPacket(conn, Packets.NotifyLastOID, **kw)
+
     def checkAnswerTransactionFinished(self, conn, **kw):
         return self.checkAnswerPacket(conn, Packets.AnswerTransactionFinished, **kw)
 

Modified: trunk/neo/tests/functional/testClient.py
==============================================================================
--- trunk/neo/tests/functional/testClient.py [iso-8859-1] (original)
+++ trunk/neo/tests/functional/testClient.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -19,6 +19,7 @@ import os
 import unittest
 import transaction
 import ZODB
+from struct import pack, unpack
 from ZODB.FileStorage import FileStorage
 from ZODB.POSException import ConflictError
 from ZODB.tests.StorageTestBase import zodb_pickle
@@ -300,6 +301,31 @@ class ClientTests(NEOFunctionalTest):
             st3.tpc_finish(t3)
         self.runWithTimeout(10, test)
 
+    def testGreaterOIDSaved(self):
+        """
+            Store an object with an OID greater than the last generated by the
+            master. This OID must be intercepted at commit, used for next OID
+            generations and persistently saved on storage nodes.
+        """
+        self.neo = NEOCluster(['test_neo1'], replicas=0,
+            temp_dir=self.getTempDirectory())
+        neoctl = self.neo.getNEOCTL()
+        self.neo.start()
+        db1, conn1 = self.neo.getZODBConnection()
+        st1 = conn1._storage
+        t1 = transaction.Transaction()
+        rev = '\0' * 8
+        data = zodb_pickle(PObject())
+        my_oid = pack('!Q', 100000)
+        # store an object with this OID
+        st1.tpc_begin(t1)
+        st1.store(my_oid, rev, data, '', t1)
+        st1.tpc_vote(t1)
+        st1.tpc_finish(t1)
+        # request an oid, should be greater than mine
+        oid = st1.new_oid()
+        self.assertTrue(oid > my_oid)
+
 def test_suite():
     return unittest.makeSuite(ClientTests)
 

Modified: trunk/neo/tests/master/testClientHandler.py
==============================================================================
--- trunk/neo/tests/master/testClientHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testClientHandler.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -80,12 +80,19 @@ class MasterClientHandlerTests(NeoTestBa
 
     def test_08_askNewOIDs(self):
         service = self.service
-        loid = self.app.loid
+        oid1, oid2 = self.getOID(1), self.getOID(2)
+        self.app.tm.setLastOID(oid1)
         # client call it
         client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, port=self.client_port)
         conn = self.getFakeConnection(client_uuid, self.client_address)
+        for node in self.app.nm.getStorageList():
+            conn = self.getFakeConnection(node.getUUID(), node.getAddress())
+            node.setConnection(conn)
         service.askNewOIDs(conn, 1)
-        self.assertTrue(loid < self.app.loid)
+        self.assertTrue(self.app.tm.getLastOID() > oid1)
+        for node in self.app.nm.getStorageList():
+            conn = node.getConnection()
+            self.assertEquals(self.checkNotifyLastOID(conn, decode=True), (oid2,))
 
     def test_09_askFinishTransaction(self):
         service = self.service

Modified: trunk/neo/tests/master/testMasterApp.py
==============================================================================
--- trunk/neo/tests/master/testMasterApp.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testMasterApp.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -32,21 +32,6 @@ class MasterAppTests(NeoTestBase):
     def tearDown(self):
         NeoTestBase.tearDown(self)
 
-    def test_05_getNewOIDList(self):
-        # must raise as we don"t have one
-        self.assertEqual(self.app.loid, None)
-        self.app.loid = None
-        self.assertRaises(RuntimeError, self.app.getNewOIDList, 1)
-        # ask list
-        self.app.loid = p64(1)
-        oid_list = self.app.getNewOIDList(15)
-        self.assertEqual(len(oid_list), 15)
-        i = 2
-        # begin from 0, so generated oid from 1 to 15
-        for oid in oid_list:
-            self.assertEqual(u64(oid), i)
-            i+=1
-
     def test_06_broadcastNodeInformation(self):
         # defined some nodes to which data will be send
         master_uuid = self.getNewUUID()

Modified: trunk/neo/tests/master/testRecovery.py
==============================================================================
--- trunk/neo/tests/master/testRecovery.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testRecovery.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -93,26 +93,24 @@ class MasterRecoveryTests(NeoTestBase):
     def test_09_answerLastIDs(self):
         recovery = self.recovery
         uuid = self.identifyToMasterNode()
-        loid = self.app.loid = '\1' * 8
-        self.app.tm.setLastTID('\1' * 8)
-        ltid = self.app.tm.getLastTID()
-        self.app.pt.setID('\1' * 8)
-        lptid = self.app.pt.getID()
+        oid1 = self.getOID(1)
+        oid2 = self.getOID(2)
+        tid1 = self.getNextTID()
+        tid2 = self.getNextTID(tid1)
+        ptid1 = self.getPTID(1)
+        ptid2 = self.getPTID(2)
+        self.app.tm.setLastOID(oid1)
+        self.app.tm.setLastTID(tid1)
+        self.app.pt.setID(ptid1)
         # send information which are later to what PMN knows, this must update target node
         conn = self.getFakeConnection(uuid, self.storage_port)
-        new_ptid = unpack('!Q', lptid)[0]
-        new_ptid = pack('!Q', new_ptid + 1)
-        oid = unpack('!Q', loid)[0]
-        new_oid = pack('!Q', oid + 1)
-        upper, lower = unpack('!LL', ltid)
-        new_tid = pack('!LL', upper, lower + 10)
-        self.assertTrue(new_ptid > self.app.pt.getID())
-        self.assertTrue(new_oid > self.app.loid)
-        self.assertTrue(new_tid > self.app.tm.getLastTID())
-        recovery.answerLastIDs(conn, new_oid, new_tid, new_ptid)
-        self.assertEquals(new_oid, self.app.loid)
-        self.assertEquals(new_tid, self.app.tm.getLastTID())
-        self.assertEquals(new_ptid, recovery.target_ptid)
+        self.assertTrue(ptid2 > self.app.pt.getID())
+        self.assertTrue(oid2 > self.app.tm.getLastOID())
+        self.assertTrue(tid2 > self.app.tm.getLastTID())
+        recovery.answerLastIDs(conn, oid2, tid2, ptid2)
+        self.assertEquals(oid2, self.app.tm.getLastOID())
+        self.assertEquals(tid2, self.app.tm.getLastTID())
+        self.assertEquals(ptid2, recovery.target_ptid)
 
 
     def test_10_answerPartitionTable(self):

Modified: trunk/neo/tests/master/testStorageHandler.py
==============================================================================
--- trunk/neo/tests/master/testStorageHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testStorageHandler.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -126,8 +126,9 @@ class MasterStorageHandlerTests(NeoTestB
         # give a uuid
         conn = self.getFakeConnection(node.getUUID(), self.storage_address)
         ptid = self.app.pt.getID()
-        oid = self.app.loid = '\1' * 8
-        tid = '\1' * 8
+        oid = self.getOID(1)
+        tid = self.getNextTID()
+        self.app.tm.setLastOID(oid)
         self.app.tm.setLastTID(tid)
         service.askLastIDs(conn)
         packet = self.checkAnswerLastIDs(conn)
@@ -136,7 +137,6 @@ class MasterStorageHandlerTests(NeoTestB
         self.assertEqual(ltid, tid)
         self.assertEqual(lptid, ptid)
 
-
     def test_13_askUnfinishedTransactions(self):
         service = self.service
         node, conn = self.identifyToMasterNode()

Modified: trunk/neo/tests/master/testTransactions.py
==============================================================================
--- trunk/neo/tests/master/testTransactions.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testTransactions.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -101,6 +101,19 @@ class testTransactionManager(NeoTestBase
         self.assertEqual(txnman.getPendingList(), [])
         self.assertFalse(txnman.hasPending())
 
+    def test_getNextOIDList(self):
+        txnman = TransactionManager()
+        # must raise as we don"t have one
+        self.assertEqual(txnman.getLastOID(), None)
+        self.assertRaises(RuntimeError, txnman.getNextOIDList, 1)
+        # ask list
+        txnman.setLastOID(self.getOID(1))
+        oid_list = txnman.getNextOIDList(15)
+        self.assertEqual(len(oid_list), 15)
+        # begin from 1, so generated oid from 2 to 16
+        for i, oid in zip(xrange(len(oid_list)), oid_list):
+            self.assertEqual(oid, self.getOID(i+2))
+
     def test_getNextTID(self):
         txnman = TransactionManager()
         # no previous TID





More information about the Neo-report mailing list