[Neo-report] r2109 vincent - in /trunk/neo: ./ client/ client/handlers/ storage/handlers/ ...

nobody at svn.erp5.org nobody at svn.erp5.org
Fri May 14 01:30:54 CEST 2010


Author: vincent
Date: Fri May 14 01:30:53 2010
New Revision: 2109

Log:
Solve store deadlocks.

Store deadlocks might happen when the order in which storage nodes receive
object data (and hence, grant write lock for those objects) is not
consistent among clients. So when a store timeouts, it might be because
another transaction got the store lock on this storage and object, while
it might wait for locks we got on other storage nodes or objects.
In doubt, report a conflict to abort current transaction in hope for the
other transaction to succeed.

Modified:
    trunk/neo/client/app.py
    trunk/neo/client/handlers/storage.py
    trunk/neo/handler.py
    trunk/neo/protocol.py
    trunk/neo/storage/handlers/client.py
    trunk/neo/tests/client/testStorageHandler.py
    trunk/neo/tests/storage/testClientHandler.py
    trunk/neo/tests/testProtocol.py

Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Fri May 14 01:30:53 2010
@@ -34,7 +34,7 @@
 from neo.event import EventManager
 from neo.util import makeChecksum as real_makeChecksum, dump
 from neo.locking import Lock
-from neo.connection import MTClientConnection
+from neo.connection import MTClientConnection, OnTimeout
 from neo.node import NodeManager
 from neo.connector import getConnectorHandler
 from neo.client.exception import NEOStorageError
@@ -581,6 +581,7 @@
         checksum = makeChecksum(compressed_data)
         p = Packets.AskStoreObject(oid, serial, compression,
                  checksum, compressed_data, self.local_var.tid)
+        on_timeout = OnTimeout(self.onStoreTimeout, oid)
         # Store object in tmp cache
         self.local_var.data_dict[oid] = data
         # Store data on each node
@@ -592,12 +593,20 @@
             if conn is None:
                 continue
             try:
-                conn.ask(p)
+                conn.ask(p, on_timeout=on_timeout)
             except ConnectionClosed:
                 continue
 
         self._waitAnyMessage(False)
         return None
+
+    def onStoreTimeout(self, conn, msg_id, oid):
+        # Ask the storage if someone locks the object.
+        # Shorten timeout to react earlier to an unresponding storage.
+        conn.ask(Packets.AskHasLock(oid), timeout=5)
+        # Stop expecting the timed-out store request.
+        self.dispatcher.forget(conn, msg_id)
+        return True
 
     @profiler_decorator
     def _handleConflicts(self, tryToResolveConflict):

Modified: trunk/neo/client/handlers/storage.py
==============================================================================
--- trunk/neo/client/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/client/handlers/storage.py [iso-8859-1] Fri May 14 01:30:53 2010
@@ -16,10 +16,11 @@
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 
 from ZODB.TimeStamp import TimeStamp
+from ZODB.POSException import ConflictError
 
 from neo import logging
 from neo.client.handlers import BaseHandler, AnswerBaseHandler
-from neo.protocol import NodeTypes, ProtocolError
+from neo.protocol import NodeTypes, ProtocolError, LockState
 from neo.util import dump
 from neo.client.exception import NEOStorageError
 
@@ -132,3 +133,20 @@
         for oid in oid_list:
             data_dict[oid] = ''
 
+    def answerHasLock(self, conn, oid, status):
+        if status == LockState.GRANTED_TO_OTHER:
+            # Object is locked by another transaction, and we have waited until
+            # timeout. To avoid a deadlock, abort current transaction (we might
+            # be locking objects the other transaction is waiting for).
+            raise ConflictError, 'Lock wait timeout for oid %s on %r' % (
+                dump(oid), conn.getNode())
+        elif status == LockState.GRANTED:
+            logging.info('Store of oid %s was successful, but after timeout.',
+                dump(oid))
+            # XXX: Not sure what to do in this case yet, for now do nothing.
+        else:
+            # Nobody has the lock, although we asked storage to lock. This
+            # means there is a software bug somewhere.
+            # XXX: Not sure what to do in this case yet
+            raise NotImplementedError
+

Modified: trunk/neo/handler.py
==============================================================================
--- trunk/neo/handler.py [iso-8859-1] (original)
+++ trunk/neo/handler.py [iso-8859-1] Fri May 14 01:30:53 2010
@@ -330,6 +330,12 @@
         raise UnexpectedPacketError
 
     def answerUndoTransaction(self, conn, oid_list, error_oid_list, conflict_oid_list):
+        raise UnexpectedPacketError
+
+    def askHasLock(self, tid, oid):
+        raise UnexpectedPacketError
+
+    def answerHasLock(self, oid, status):
         raise UnexpectedPacketError
 
     # Error packet handlers.
@@ -437,6 +443,8 @@
         d[Packets.NotifyReplicationDone] = self.notifyReplicationDone
         d[Packets.AskUndoTransaction] = self.askUndoTransaction
         d[Packets.AnswerUndoTransaction] = self.answerUndoTransaction
+        d[Packets.AskHasLock] = self.askHasLock
+        d[Packets.AnswerHasLock] = self.answerHasLock
 
         return d
 

Modified: trunk/neo/protocol.py
==============================================================================
--- trunk/neo/protocol.py [iso-8859-1] (original)
+++ trunk/neo/protocol.py [iso-8859-1] Fri May 14 01:30:53 2010
@@ -77,6 +77,12 @@
     DISCARDED = Enum.Item(4)
 CellStates = CellStates()
 
+class LockState(Enum):
+    NOT_LOCKED = Enum.Item(1)
+    GRANTED = Enum.Item(2)
+    GRANTED_TO_OTHER = Enum.Item(3)
+LockState = LockState()
+
 # used for logging
 node_state_prefix_dict = {
     NodeStates.RUNNING: 'R',
@@ -158,6 +164,13 @@
         raise PacketMalformedError('invalid error code %d' %
                 original_error_code)
     return error_code
+
+def _decodeLockState(original_lock_state):
+    lock_state = LockState.get(original_lock_state)
+    if lock_state is None:
+        raise PacketMalformedError('invalid lock state %d' % (
+            original_lock_state, ))
+    return lock_state
 
 def _decodeAddress(address):
     if address == '\0' * 6:
@@ -1546,6 +1559,30 @@
             for _ in xrange(some_list_len):
                 append(read(OID_LEN))
         return (oid_list, error_oid_list, conflict_oid_list)
+
+class AskHasLock(Packet):
+    """
+    Ask a storage is oid is locked by another transaction.
+    C -> S
+    """
+    def _encode(self, tid, oid):
+        return _encodeTID(tid) + _encodeTID(oid)
+
+    def _decode(self, body):
+        return (_decodeTID(body[:8]), _decodeTID(body[8:]))
+
+class AnswerHasLock(Packet):
+    """
+    Answer whether a transaction holds the write lock for requested object.
+    """
+    _header_format = '!8sH'
+
+    def _encode(self, oid, state):
+        return pack(self._header_format, oid, state)
+
+    def _decode(self, body):
+        oid, state = unpack(self._header_format, body)
+        return (oid, _decodeLockState(state))
 
 class Error(Packet):
     """
@@ -1770,6 +1807,10 @@
             0x0033,
             AskUndoTransaction,
             AnswerUndoTransaction)
+    AskHasLock, AnswerHasLock = register(
+            0x0034,
+            AskHasLock,
+            AnswerHasLock)
 
 # build a "singleton"
 Packets = PacketRegistry()

Modified: trunk/neo/storage/handlers/client.py
==============================================================================
--- trunk/neo/storage/handlers/client.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/client.py [iso-8859-1] Fri May 14 01:30:53 2010
@@ -16,7 +16,7 @@
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 
 from neo import protocol
-from neo.protocol import Packets
+from neo.protocol import Packets, LockState
 from neo.storage.handlers import BaseClientAndStorageOperationHandler
 from neo.storage.transactions import ConflictError, DelayedError
 import time
@@ -124,3 +124,13 @@
         conn.answer(Packets.AnswerUndoTransaction(oid_list, error_oid_list,
             conflict_oid_list))
 
+    def askHasLock(self, conn, tid, oid):
+        locking_tid = self.app.tm.getLockingTID(oid)
+        if locking_tid is None:
+            state = LockState.NOT_LOCKED
+        elif locking_tid is tid:
+            state = LockState.GRANTED
+        else:
+            state = LockState.GRANTED_TO_OTHER
+        conn.answer(Packets.AnswerHasLock(oid, state))
+

Modified: trunk/neo/tests/client/testStorageHandler.py
==============================================================================
--- trunk/neo/tests/client/testStorageHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testStorageHandler.py [iso-8859-1] Fri May 14 01:30:53 2010
@@ -18,10 +18,11 @@
 import unittest
 from mock import Mock
 from neo.tests import NeoTestBase
-from neo.protocol import NodeTypes
+from neo.protocol import NodeTypes, LockState
 from neo.client.handlers.storage import StorageBootstrapHandler, \
        StorageAnswersHandler
 from neo.client.exception import NEOStorageError
+from ZODB.POSException import ConflictError
 
 MARKER = []
 
@@ -257,6 +258,17 @@
         self.assertEqual(undo_error_oid_list, [oid_2])
         self.assertEqual(data_dict, {oid_1: ''})
 
+    def test_answerHasLock(self):
+        uuid = self.getNewUUID()
+        conn = self.getFakeConnection(uuid=uuid)
+        oid = self.getOID(0)
+
+        self.assertRaises(ConflictError, self.handler.answerHasLock, conn, oid,
+            LockState.GRANTED_TO_OTHER)
+        # XXX: Just check that this doesn't raise for the moment.
+        self.handler.answerHasLock(conn, oid, LockState.GRANTED)
+        # TODO: Test LockState.NOT_LOCKED case when implemented.
+
 if __name__ == '__main__':
     unittest.main()
 

Modified: trunk/neo/tests/storage/testClientHandler.py
==============================================================================
--- trunk/neo/tests/storage/testClientHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testClientHandler.py [iso-8859-1] Fri May 14 01:30:53 2010
@@ -24,7 +24,7 @@
 from neo.storage.handlers.client import ClientOperationHandler
 from neo.protocol import INVALID_PARTITION
 from neo.protocol import INVALID_TID, INVALID_OID, INVALID_SERIAL
-from neo.protocol import Packets
+from neo.protocol import Packets, LockState
 
 class StorageClientHandlerTests(NeoTestBase):
 
@@ -277,5 +277,24 @@
         self.assertEqual(oid_list_2, [oid_2])
         self.assertEqual(oid_list_3, [oid_3])
 
+    def test_askHasLock(self):
+        tid_1 = self.getNextTID()
+        tid_2 = self.getNextTID()
+        oid = self.getNextTID()
+        def getLockingTID(oid):
+            return locking_tid
+        self.app.tm.getLockingTID = getLockingTID
+        for locking_tid, status in (
+                    (None, LockState.NOT_LOCKED),
+                    (tid_1, LockState.GRANTED),
+                    (tid_2, LockState.GRANTED_TO_OTHER),
+                ):
+            conn = self._getConnection()
+            self.operation.askHasLock(conn, tid_1, oid)
+            p_oid, p_status = self.checkAnswerPacket(conn,
+                Packets.AnswerHasLock, decode=True)
+            self.assertEqual(oid, p_oid)
+            self.assertEqual(status, p_status)
+
 if __name__ == "__main__":
     unittest.main()

Modified: trunk/neo/tests/testProtocol.py
==============================================================================
--- trunk/neo/tests/testProtocol.py [iso-8859-1] (original)
+++ trunk/neo/tests/testProtocol.py [iso-8859-1] Fri May 14 01:30:53 2010
@@ -17,7 +17,7 @@
 
 import unittest
 from neo.protocol import NodeTypes, NodeStates, CellStates, ClusterStates
-from neo.protocol import ErrorCodes, Packets, Errors
+from neo.protocol import ErrorCodes, Packets, Errors, LockState
 from neo.protocol import INVALID_TID
 from neo.tests import NeoTestBase
 
@@ -589,6 +589,17 @@
         p = Packets.AnswerPartitionList(ptid, row_list)
         self.assertEqual(p.decode(), (ptid, row_list))
 
+    def test_AskHasLock(self):
+        tid = self.getNextTID()
+        oid = self.getNextTID()
+        p = Packets.AskHasLock(tid, oid)
+        self.assertEqual(p.decode(), (tid, oid))
+
+    def test_AnswerHasLock(self):
+        oid = self.getNextTID()
+        for lock_state in LockState.itervalues():
+            p = Packets.AnswerHasLock(oid, lock_state)
+            self.assertEqual(p.decode(), (oid, lock_state))
 
 if __name__ == '__main__':
     unittest.main()





More information about the Neo-report mailing list