[Neo-report] r2478 vincent - in /trunk/neo: ./ client/ client/handlers/ master/ master/han...

nobody at svn.erp5.org nobody at svn.erp5.org
Tue Dec 7 22:25:25 CET 2010


Author: vincent
Date: Tue Dec  7 22:25:24 2010
New Revision: 2478

Log:
Implement lastTransaction.

Modified:
    trunk/neo/client/app.py
    trunk/neo/client/handlers/master.py
    trunk/neo/handler.py
    trunk/neo/master/app.py
    trunk/neo/master/handlers/client.py
    trunk/neo/master/handlers/storage.py
    trunk/neo/master/recovery.py
    trunk/neo/protocol.py
    trunk/neo/tests/testProtocol.py

Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Tue Dec  7 22:25:24 2010
@@ -124,6 +124,7 @@ class ThreadContext(object):
             'undo_object_tid_dict': {},
             'involved_nodes': set(),
             'barrier_done': False,
+            'last_transaction': None,
         }
 
 
@@ -1205,9 +1206,8 @@ class Application(object):
         return Iterator(self, start, stop)
 
     def lastTransaction(self):
-        # XXX: this doesn't consider transactions created by other clients,
-        #  should ask the primary master
-        return self.local_var.tid
+        self._askPrimary(Packets.AskLastCommittedTID())
+        return self.local_var.last_transaction
 
     def abortVersion(self, src, transaction):
         if transaction is not self.local_var.txn:

Modified: trunk/neo/client/handlers/master.py
==============================================================================
--- trunk/neo/client/handlers/master.py [iso-8859-1] (original)
+++ trunk/neo/client/handlers/master.py [iso-8859-1] Tue Dec  7 22:25:24 2010
@@ -175,3 +175,6 @@ class PrimaryAnswersHandler(AnswerBaseHa
         if not status:
             raise NEOStorageError('Already packing')
 
+    def answerLastTransaction(self, conn, ltid):
+        self.app.local_var.last_transaction = ltid
+

Modified: trunk/neo/handler.py
==============================================================================
--- trunk/neo/handler.py [iso-8859-1] (original)
+++ trunk/neo/handler.py [iso-8859-1] Tue Dec  7 22:25:24 2010
@@ -324,7 +324,7 @@ class EventHandler(object):
     def notifyReplicationDone(self, conn, offset):
         raise UnexpectedPacketError
 
-    def askObjectUndoSerial(self, conn, tid, undone_tid, oid_list):
+    def askObjectUndoSerial(self, conn, tid, ltid, undone_tid, oid_list):
         raise UnexpectedPacketError
 
     def answerObjectUndoSerial(self, conn, object_tid_dict):
@@ -366,6 +366,12 @@ class EventHandler(object):
     def notifyReady(self, conn):
         raise UnexpectedPacketError
 
+    def askLastTransaction(self, conn):
+        raise UnexpectedPacketError
+
+    def answerLastTransaction(self, conn, tid):
+        raise UnexpectedPacketError
+
     # Error packet handlers.
 
     def error(self, conn, code, message):
@@ -484,6 +490,8 @@ class EventHandler(object):
         d[Packets.AskCheckSerialRange] = self.askCheckSerialRange
         d[Packets.AnswerCheckSerialRange] = self.answerCheckSerialRange
         d[Packets.NotifyReady] = self.notifyReady
+        d[Packets.AskLastTransaction] = self.askLastTransaction
+        d[Packets.AnswerLastTransaction] = self.answerLastTransaction
 
         return d
 

Modified: trunk/neo/master/app.py
==============================================================================
--- trunk/neo/master/app.py [iso-8859-1] (original)
+++ trunk/neo/master/app.py [iso-8859-1] Tue Dec  7 22:25:24 2010
@@ -20,7 +20,7 @@ import os, sys
 from time import time
 
 from neo import protocol
-from neo.protocol import UUID_NAMESPACES
+from neo.protocol import UUID_NAMESPACES, ZERO_TID
 from neo.protocol import ClusterStates, NodeStates, NodeTypes, Packets
 from neo.node import NodeManager
 from neo.event import EventManager
@@ -41,6 +41,8 @@ from neo.live_debug import register as r
 class Application(object):
     """The master node application."""
     packing = None
+    # Latest completely commited TID
+    last_transaction = ZERO_TID
 
     def __init__(self, config):
 
@@ -559,6 +561,14 @@ class Application(object):
             neo.logging.info('Accept a storage %s (%s)' % (dump(uuid), state))
         return (uuid, node, state, handler, node_ctor)
 
+    def getLastTransaction(self):
+        return self.last_transaction
+
+    def setLastTransaction(self, tid):
+        ltid = self.last_transaction
+        assert tid >= ltid, (tid, ltid)
+        self.last_transaction = tid
+
     def setStorageNotReady(self, uuid):
         self.storage_readiness.discard(uuid)
 

Modified: trunk/neo/master/handlers/client.py
==============================================================================
--- trunk/neo/master/handlers/client.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/client.py [iso-8859-1] Tue Dec  7 22:25:24 2010
@@ -104,3 +104,7 @@ class ClientServiceHandler(MasterHandler
         else:
             conn.answer(Packets.AnswerPack(False))
 
+    def askLastTransaction(self, conn):
+        conn.answer(Packets.AnswerLastTransaction(
+            self.app.getLastTransaction()))
+

Modified: trunk/neo/master/handlers/storage.py
==============================================================================
--- trunk/neo/master/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/storage.py [iso-8859-1] Tue Dec  7 22:25:24 2010
@@ -106,6 +106,7 @@ class StorageServiceHandler(BaseServiceH
 
         # remove transaction from manager
         tm.remove(tid)
+        app.setLastTransaction(tid)
 
     def notifyReplicationDone(self, conn, offset):
         node = self.app.nm.getByUUID(conn.getUUID())

Modified: trunk/neo/master/recovery.py
==============================================================================
--- trunk/neo/master/recovery.py [iso-8859-1] (original)
+++ trunk/neo/master/recovery.py [iso-8859-1] Tue Dec  7 22:25:24 2010
@@ -80,6 +80,7 @@ class RecoveryManager(MasterHandler):
             node.setPending()
         self.app.broadcastNodesInformation(refused_node_set)
 
+        self.app.setLastTransaction(self.app.tm.getLastTID())
         neo.logging.debug('cluster starts with loid=%s and this partition ' \
             'table :', dump(self.app.tm.getLastOID()))
         self.app.pt.log()

Modified: trunk/neo/protocol.py
==============================================================================
--- trunk/neo/protocol.py [iso-8859-1] (original)
+++ trunk/neo/protocol.py [iso-8859-1] Tue Dec  7 22:25:24 2010
@@ -1695,6 +1695,24 @@ class AnswerCheckSerialRange(Packet):
         # serial_checksum, max_serial
         return unpack(self._header_format, body)
 
+class AskLastTransaction(Packet):
+    """
+    Ask last committed TID.
+    C -> M
+    """
+    pass
+
+class AnswerLastTransaction(Packet):
+    """
+    Answer last committed TID.
+    M -> C
+    """
+    def _encode(self, tid):
+        return tid
+
+    def _decode(self, body):
+        return (body, )
+
 class NotifyReady(Packet):
     """
     Notify that node is ready to serve requests.
@@ -1970,6 +1988,11 @@ class PacketRegistry(dict):
             AnswerCheckSerialRange,
             )
     NotifyReady = register(0x003B, NotifyReady)
+    AskLastTransaction, AnswerLastTransaction = register(
+            0x003C,
+            AskLastTransaction,
+            AnswerLastTransaction,
+            )
 
 # build a "singleton"
 Packets = PacketRegistry()

Modified: trunk/neo/tests/testProtocol.py
==============================================================================
--- trunk/neo/tests/testProtocol.py [iso-8859-1] (original)
+++ trunk/neo/tests/testProtocol.py [iso-8859-1] Tue Dec  7 22:25:24 2010
@@ -693,6 +693,15 @@ class ProtocolTests(NeoUnitTestBase):
         p = Packets.NotifyReady()
         self.assertEqual(tuple(), p.decode())
 
+    def test_AskLastTransaction(self):
+        Packets.AskLastTransaction()
+
+    def test_AnswerLastTransaction(self):
+        tid = self.getNextTID()
+        p = Packets.AnswerLastTransaction(tid)
+        ptid = p.decode()[0]
+        self.assertEqual(ptid, tid)
+
 if __name__ == '__main__':
     unittest.main()
 




More information about the Neo-report mailing list