[Neo-report] r2109 vincent - in /trunk/neo: ./ client/ client/handlers/ storage/handlers/ ...
nobody at svn.erp5.org
nobody at svn.erp5.org
Fri May 14 01:30:54 CEST 2010
Author: vincent
Date: Fri May 14 01:30:53 2010
New Revision: 2109
Log:
Solve store deadlocks.
Store deadlocks might happen when the order in which storage nodes receive
object data (and hence, grant write lock for those objects) is not
consistent among clients. So when a store timeouts, it might be because
another transaction got the store lock on this storage and object, while
it might wait for locks we got on other storage nodes or objects.
In doubt, report a conflict to abort current transaction in hope for the
other transaction to succeed.
Modified:
trunk/neo/client/app.py
trunk/neo/client/handlers/storage.py
trunk/neo/handler.py
trunk/neo/protocol.py
trunk/neo/storage/handlers/client.py
trunk/neo/tests/client/testStorageHandler.py
trunk/neo/tests/storage/testClientHandler.py
trunk/neo/tests/testProtocol.py
Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Fri May 14 01:30:53 2010
@@ -34,7 +34,7 @@
from neo.event import EventManager
from neo.util import makeChecksum as real_makeChecksum, dump
from neo.locking import Lock
-from neo.connection import MTClientConnection
+from neo.connection import MTClientConnection, OnTimeout
from neo.node import NodeManager
from neo.connector import getConnectorHandler
from neo.client.exception import NEOStorageError
@@ -581,6 +581,7 @@
checksum = makeChecksum(compressed_data)
p = Packets.AskStoreObject(oid, serial, compression,
checksum, compressed_data, self.local_var.tid)
+ on_timeout = OnTimeout(self.onStoreTimeout, oid)
# Store object in tmp cache
self.local_var.data_dict[oid] = data
# Store data on each node
@@ -592,12 +593,20 @@
if conn is None:
continue
try:
- conn.ask(p)
+ conn.ask(p, on_timeout=on_timeout)
except ConnectionClosed:
continue
self._waitAnyMessage(False)
return None
+
+ def onStoreTimeout(self, conn, msg_id, oid):
+ # Ask the storage if someone locks the object.
+ # Shorten timeout to react earlier to an unresponding storage.
+ conn.ask(Packets.AskHasLock(oid), timeout=5)
+ # Stop expecting the timed-out store request.
+ self.dispatcher.forget(conn, msg_id)
+ return True
@profiler_decorator
def _handleConflicts(self, tryToResolveConflict):
Modified: trunk/neo/client/handlers/storage.py
==============================================================================
--- trunk/neo/client/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/client/handlers/storage.py [iso-8859-1] Fri May 14 01:30:53 2010
@@ -16,10 +16,11 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from ZODB.TimeStamp import TimeStamp
+from ZODB.POSException import ConflictError
from neo import logging
from neo.client.handlers import BaseHandler, AnswerBaseHandler
-from neo.protocol import NodeTypes, ProtocolError
+from neo.protocol import NodeTypes, ProtocolError, LockState
from neo.util import dump
from neo.client.exception import NEOStorageError
@@ -132,3 +133,20 @@
for oid in oid_list:
data_dict[oid] = ''
+ def answerHasLock(self, conn, oid, status):
+ if status == LockState.GRANTED_TO_OTHER:
+ # Object is locked by another transaction, and we have waited until
+ # timeout. To avoid a deadlock, abort current transaction (we might
+ # be locking objects the other transaction is waiting for).
+ raise ConflictError, 'Lock wait timeout for oid %s on %r' % (
+ dump(oid), conn.getNode())
+ elif status == LockState.GRANTED:
+ logging.info('Store of oid %s was successful, but after timeout.',
+ dump(oid))
+ # XXX: Not sure what to do in this case yet, for now do nothing.
+ else:
+ # Nobody has the lock, although we asked storage to lock. This
+ # means there is a software bug somewhere.
+ # XXX: Not sure what to do in this case yet
+ raise NotImplementedError
+
Modified: trunk/neo/handler.py
==============================================================================
--- trunk/neo/handler.py [iso-8859-1] (original)
+++ trunk/neo/handler.py [iso-8859-1] Fri May 14 01:30:53 2010
@@ -330,6 +330,12 @@
raise UnexpectedPacketError
def answerUndoTransaction(self, conn, oid_list, error_oid_list, conflict_oid_list):
+ raise UnexpectedPacketError
+
+ def askHasLock(self, tid, oid):
+ raise UnexpectedPacketError
+
+ def answerHasLock(self, oid, status):
raise UnexpectedPacketError
# Error packet handlers.
@@ -437,6 +443,8 @@
d[Packets.NotifyReplicationDone] = self.notifyReplicationDone
d[Packets.AskUndoTransaction] = self.askUndoTransaction
d[Packets.AnswerUndoTransaction] = self.answerUndoTransaction
+ d[Packets.AskHasLock] = self.askHasLock
+ d[Packets.AnswerHasLock] = self.answerHasLock
return d
Modified: trunk/neo/protocol.py
==============================================================================
--- trunk/neo/protocol.py [iso-8859-1] (original)
+++ trunk/neo/protocol.py [iso-8859-1] Fri May 14 01:30:53 2010
@@ -77,6 +77,12 @@
DISCARDED = Enum.Item(4)
CellStates = CellStates()
+class LockState(Enum):
+ NOT_LOCKED = Enum.Item(1)
+ GRANTED = Enum.Item(2)
+ GRANTED_TO_OTHER = Enum.Item(3)
+LockState = LockState()
+
# used for logging
node_state_prefix_dict = {
NodeStates.RUNNING: 'R',
@@ -158,6 +164,13 @@
raise PacketMalformedError('invalid error code %d' %
original_error_code)
return error_code
+
+def _decodeLockState(original_lock_state):
+ lock_state = LockState.get(original_lock_state)
+ if lock_state is None:
+ raise PacketMalformedError('invalid lock state %d' % (
+ original_lock_state, ))
+ return lock_state
def _decodeAddress(address):
if address == '\0' * 6:
@@ -1546,6 +1559,30 @@
for _ in xrange(some_list_len):
append(read(OID_LEN))
return (oid_list, error_oid_list, conflict_oid_list)
+
+class AskHasLock(Packet):
+ """
+ Ask a storage is oid is locked by another transaction.
+ C -> S
+ """
+ def _encode(self, tid, oid):
+ return _encodeTID(tid) + _encodeTID(oid)
+
+ def _decode(self, body):
+ return (_decodeTID(body[:8]), _decodeTID(body[8:]))
+
+class AnswerHasLock(Packet):
+ """
+ Answer whether a transaction holds the write lock for requested object.
+ """
+ _header_format = '!8sH'
+
+ def _encode(self, oid, state):
+ return pack(self._header_format, oid, state)
+
+ def _decode(self, body):
+ oid, state = unpack(self._header_format, body)
+ return (oid, _decodeLockState(state))
class Error(Packet):
"""
@@ -1770,6 +1807,10 @@
0x0033,
AskUndoTransaction,
AnswerUndoTransaction)
+ AskHasLock, AnswerHasLock = register(
+ 0x0034,
+ AskHasLock,
+ AnswerHasLock)
# build a "singleton"
Packets = PacketRegistry()
Modified: trunk/neo/storage/handlers/client.py
==============================================================================
--- trunk/neo/storage/handlers/client.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/client.py [iso-8859-1] Fri May 14 01:30:53 2010
@@ -16,7 +16,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from neo import protocol
-from neo.protocol import Packets
+from neo.protocol import Packets, LockState
from neo.storage.handlers import BaseClientAndStorageOperationHandler
from neo.storage.transactions import ConflictError, DelayedError
import time
@@ -124,3 +124,13 @@
conn.answer(Packets.AnswerUndoTransaction(oid_list, error_oid_list,
conflict_oid_list))
+ def askHasLock(self, conn, tid, oid):
+ locking_tid = self.app.tm.getLockingTID(oid)
+ if locking_tid is None:
+ state = LockState.NOT_LOCKED
+ elif locking_tid is tid:
+ state = LockState.GRANTED
+ else:
+ state = LockState.GRANTED_TO_OTHER
+ conn.answer(Packets.AnswerHasLock(oid, state))
+
Modified: trunk/neo/tests/client/testStorageHandler.py
==============================================================================
--- trunk/neo/tests/client/testStorageHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testStorageHandler.py [iso-8859-1] Fri May 14 01:30:53 2010
@@ -18,10 +18,11 @@
import unittest
from mock import Mock
from neo.tests import NeoTestBase
-from neo.protocol import NodeTypes
+from neo.protocol import NodeTypes, LockState
from neo.client.handlers.storage import StorageBootstrapHandler, \
StorageAnswersHandler
from neo.client.exception import NEOStorageError
+from ZODB.POSException import ConflictError
MARKER = []
@@ -257,6 +258,17 @@
self.assertEqual(undo_error_oid_list, [oid_2])
self.assertEqual(data_dict, {oid_1: ''})
+ def test_answerHasLock(self):
+ uuid = self.getNewUUID()
+ conn = self.getFakeConnection(uuid=uuid)
+ oid = self.getOID(0)
+
+ self.assertRaises(ConflictError, self.handler.answerHasLock, conn, oid,
+ LockState.GRANTED_TO_OTHER)
+ # XXX: Just check that this doesn't raise for the moment.
+ self.handler.answerHasLock(conn, oid, LockState.GRANTED)
+ # TODO: Test LockState.NOT_LOCKED case when implemented.
+
if __name__ == '__main__':
unittest.main()
Modified: trunk/neo/tests/storage/testClientHandler.py
==============================================================================
--- trunk/neo/tests/storage/testClientHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testClientHandler.py [iso-8859-1] Fri May 14 01:30:53 2010
@@ -24,7 +24,7 @@
from neo.storage.handlers.client import ClientOperationHandler
from neo.protocol import INVALID_PARTITION
from neo.protocol import INVALID_TID, INVALID_OID, INVALID_SERIAL
-from neo.protocol import Packets
+from neo.protocol import Packets, LockState
class StorageClientHandlerTests(NeoTestBase):
@@ -277,5 +277,24 @@
self.assertEqual(oid_list_2, [oid_2])
self.assertEqual(oid_list_3, [oid_3])
+ def test_askHasLock(self):
+ tid_1 = self.getNextTID()
+ tid_2 = self.getNextTID()
+ oid = self.getNextTID()
+ def getLockingTID(oid):
+ return locking_tid
+ self.app.tm.getLockingTID = getLockingTID
+ for locking_tid, status in (
+ (None, LockState.NOT_LOCKED),
+ (tid_1, LockState.GRANTED),
+ (tid_2, LockState.GRANTED_TO_OTHER),
+ ):
+ conn = self._getConnection()
+ self.operation.askHasLock(conn, tid_1, oid)
+ p_oid, p_status = self.checkAnswerPacket(conn,
+ Packets.AnswerHasLock, decode=True)
+ self.assertEqual(oid, p_oid)
+ self.assertEqual(status, p_status)
+
if __name__ == "__main__":
unittest.main()
Modified: trunk/neo/tests/testProtocol.py
==============================================================================
--- trunk/neo/tests/testProtocol.py [iso-8859-1] (original)
+++ trunk/neo/tests/testProtocol.py [iso-8859-1] Fri May 14 01:30:53 2010
@@ -17,7 +17,7 @@
import unittest
from neo.protocol import NodeTypes, NodeStates, CellStates, ClusterStates
-from neo.protocol import ErrorCodes, Packets, Errors
+from neo.protocol import ErrorCodes, Packets, Errors, LockState
from neo.protocol import INVALID_TID
from neo.tests import NeoTestBase
@@ -589,6 +589,17 @@
p = Packets.AnswerPartitionList(ptid, row_list)
self.assertEqual(p.decode(), (ptid, row_list))
+ def test_AskHasLock(self):
+ tid = self.getNextTID()
+ oid = self.getNextTID()
+ p = Packets.AskHasLock(tid, oid)
+ self.assertEqual(p.decode(), (tid, oid))
+
+ def test_AnswerHasLock(self):
+ oid = self.getNextTID()
+ for lock_state in LockState.itervalues():
+ p = Packets.AnswerHasLock(oid, lock_state)
+ self.assertEqual(p.decode(), (oid, lock_state))
if __name__ == '__main__':
unittest.main()
More information about the Neo-report
mailing list