[Neo-report] r2478 vincent - in /trunk/neo: ./ client/ client/handlers/ master/ master/han...
nobody at svn.erp5.org
nobody at svn.erp5.org
Tue Dec 7 22:25:25 CET 2010
Author: vincent
Date: Tue Dec 7 22:25:24 2010
New Revision: 2478
Log:
Implement lastTransaction.
Modified:
trunk/neo/client/app.py
trunk/neo/client/handlers/master.py
trunk/neo/handler.py
trunk/neo/master/app.py
trunk/neo/master/handlers/client.py
trunk/neo/master/handlers/storage.py
trunk/neo/master/recovery.py
trunk/neo/protocol.py
trunk/neo/tests/testProtocol.py
Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Tue Dec 7 22:25:24 2010
@@ -124,6 +124,7 @@ class ThreadContext(object):
'undo_object_tid_dict': {},
'involved_nodes': set(),
'barrier_done': False,
+ 'last_transaction': None,
}
@@ -1205,9 +1206,8 @@ class Application(object):
return Iterator(self, start, stop)
def lastTransaction(self):
- # XXX: this doesn't consider transactions created by other clients,
- # should ask the primary master
- return self.local_var.tid
+ self._askPrimary(Packets.AskLastCommittedTID())
+ return self.local_var.last_transaction
def abortVersion(self, src, transaction):
if transaction is not self.local_var.txn:
Modified: trunk/neo/client/handlers/master.py
==============================================================================
--- trunk/neo/client/handlers/master.py [iso-8859-1] (original)
+++ trunk/neo/client/handlers/master.py [iso-8859-1] Tue Dec 7 22:25:24 2010
@@ -175,3 +175,6 @@ class PrimaryAnswersHandler(AnswerBaseHa
if not status:
raise NEOStorageError('Already packing')
+ def answerLastTransaction(self, conn, ltid):
+ self.app.local_var.last_transaction = ltid
+
Modified: trunk/neo/handler.py
==============================================================================
--- trunk/neo/handler.py [iso-8859-1] (original)
+++ trunk/neo/handler.py [iso-8859-1] Tue Dec 7 22:25:24 2010
@@ -324,7 +324,7 @@ class EventHandler(object):
def notifyReplicationDone(self, conn, offset):
raise UnexpectedPacketError
- def askObjectUndoSerial(self, conn, tid, undone_tid, oid_list):
+ def askObjectUndoSerial(self, conn, tid, ltid, undone_tid, oid_list):
raise UnexpectedPacketError
def answerObjectUndoSerial(self, conn, object_tid_dict):
@@ -366,6 +366,12 @@ class EventHandler(object):
def notifyReady(self, conn):
raise UnexpectedPacketError
+ def askLastTransaction(self, conn):
+ raise UnexpectedPacketError
+
+ def answerLastTransaction(self, conn, tid):
+ raise UnexpectedPacketError
+
# Error packet handlers.
def error(self, conn, code, message):
@@ -484,6 +490,8 @@ class EventHandler(object):
d[Packets.AskCheckSerialRange] = self.askCheckSerialRange
d[Packets.AnswerCheckSerialRange] = self.answerCheckSerialRange
d[Packets.NotifyReady] = self.notifyReady
+ d[Packets.AskLastTransaction] = self.askLastTransaction
+ d[Packets.AnswerLastTransaction] = self.answerLastTransaction
return d
Modified: trunk/neo/master/app.py
==============================================================================
--- trunk/neo/master/app.py [iso-8859-1] (original)
+++ trunk/neo/master/app.py [iso-8859-1] Tue Dec 7 22:25:24 2010
@@ -20,7 +20,7 @@ import os, sys
from time import time
from neo import protocol
-from neo.protocol import UUID_NAMESPACES
+from neo.protocol import UUID_NAMESPACES, ZERO_TID
from neo.protocol import ClusterStates, NodeStates, NodeTypes, Packets
from neo.node import NodeManager
from neo.event import EventManager
@@ -41,6 +41,8 @@ from neo.live_debug import register as r
class Application(object):
"""The master node application."""
packing = None
+ # Latest completely commited TID
+ last_transaction = ZERO_TID
def __init__(self, config):
@@ -559,6 +561,14 @@ class Application(object):
neo.logging.info('Accept a storage %s (%s)' % (dump(uuid), state))
return (uuid, node, state, handler, node_ctor)
+ def getLastTransaction(self):
+ return self.last_transaction
+
+ def setLastTransaction(self, tid):
+ ltid = self.last_transaction
+ assert tid >= ltid, (tid, ltid)
+ self.last_transaction = tid
+
def setStorageNotReady(self, uuid):
self.storage_readiness.discard(uuid)
Modified: trunk/neo/master/handlers/client.py
==============================================================================
--- trunk/neo/master/handlers/client.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/client.py [iso-8859-1] Tue Dec 7 22:25:24 2010
@@ -104,3 +104,7 @@ class ClientServiceHandler(MasterHandler
else:
conn.answer(Packets.AnswerPack(False))
+ def askLastTransaction(self, conn):
+ conn.answer(Packets.AnswerLastTransaction(
+ self.app.getLastTransaction()))
+
Modified: trunk/neo/master/handlers/storage.py
==============================================================================
--- trunk/neo/master/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/storage.py [iso-8859-1] Tue Dec 7 22:25:24 2010
@@ -106,6 +106,7 @@ class StorageServiceHandler(BaseServiceH
# remove transaction from manager
tm.remove(tid)
+ app.setLastTransaction(tid)
def notifyReplicationDone(self, conn, offset):
node = self.app.nm.getByUUID(conn.getUUID())
Modified: trunk/neo/master/recovery.py
==============================================================================
--- trunk/neo/master/recovery.py [iso-8859-1] (original)
+++ trunk/neo/master/recovery.py [iso-8859-1] Tue Dec 7 22:25:24 2010
@@ -80,6 +80,7 @@ class RecoveryManager(MasterHandler):
node.setPending()
self.app.broadcastNodesInformation(refused_node_set)
+ self.app.setLastTransaction(self.app.tm.getLastTID())
neo.logging.debug('cluster starts with loid=%s and this partition ' \
'table :', dump(self.app.tm.getLastOID()))
self.app.pt.log()
Modified: trunk/neo/protocol.py
==============================================================================
--- trunk/neo/protocol.py [iso-8859-1] (original)
+++ trunk/neo/protocol.py [iso-8859-1] Tue Dec 7 22:25:24 2010
@@ -1695,6 +1695,24 @@ class AnswerCheckSerialRange(Packet):
# serial_checksum, max_serial
return unpack(self._header_format, body)
+class AskLastTransaction(Packet):
+ """
+ Ask last committed TID.
+ C -> M
+ """
+ pass
+
+class AnswerLastTransaction(Packet):
+ """
+ Answer last committed TID.
+ M -> C
+ """
+ def _encode(self, tid):
+ return tid
+
+ def _decode(self, body):
+ return (body, )
+
class NotifyReady(Packet):
"""
Notify that node is ready to serve requests.
@@ -1970,6 +1988,11 @@ class PacketRegistry(dict):
AnswerCheckSerialRange,
)
NotifyReady = register(0x003B, NotifyReady)
+ AskLastTransaction, AnswerLastTransaction = register(
+ 0x003C,
+ AskLastTransaction,
+ AnswerLastTransaction,
+ )
# build a "singleton"
Packets = PacketRegistry()
Modified: trunk/neo/tests/testProtocol.py
==============================================================================
--- trunk/neo/tests/testProtocol.py [iso-8859-1] (original)
+++ trunk/neo/tests/testProtocol.py [iso-8859-1] Tue Dec 7 22:25:24 2010
@@ -693,6 +693,15 @@ class ProtocolTests(NeoUnitTestBase):
p = Packets.NotifyReady()
self.assertEqual(tuple(), p.decode())
+ def test_AskLastTransaction(self):
+ Packets.AskLastTransaction()
+
+ def test_AnswerLastTransaction(self):
+ tid = self.getNextTID()
+ p = Packets.AnswerLastTransaction(tid)
+ ptid = p.decode()[0]
+ self.assertEqual(ptid, tid)
+
if __name__ == '__main__':
unittest.main()
More information about the Neo-report
mailing list