[Neo-report] r2763 jm - in /trunk/neo: client/ client/handlers/ tests/client/
nobody at svn.erp5.org
nobody at svn.erp5.org
Fri May 27 18:53:27 CEST 2011
Author: jm
Date: Fri May 27 18:53:27 2011
New Revision: 2763
Log:
Update implementation of HasLock processing on client side
Modified:
trunk/neo/client/app.py
trunk/neo/client/handlers/storage.py
trunk/neo/tests/client/testStorageHandler.py
Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Fri May 27 18:53:27 2011
@@ -527,7 +527,7 @@ class Application(object):
else:
compression = 1
checksum = makeChecksum(compressed_data)
- on_timeout = OnTimeout(self.onStoreTimeout, ttid, oid)
+ on_timeout = OnTimeout(self.onStoreTimeout, txn_context, oid)
# Store object in tmp cache
data_dict = txn_context['data_dict']
if oid not in data_dict:
@@ -555,15 +555,15 @@ class Application(object):
self._waitAnyTransactionMessage(txn_context, False)
- def onStoreTimeout(self, conn, msg_id, ttid, oid):
+ def onStoreTimeout(self, conn, msg_id, txn_context, oid):
# NOTE: this method is called from poll thread, don't use
- # thread-specific value !
- # Stop expecting the timed-out store request.
- queue = self.dispatcher.forget(conn, msg_id)
+ # thread-specific value !
+ txn_context.setdefault('timeout_dict', {})[oid] = msg_id
# Ask the storage if someone locks the object.
- # Shorten timeout to react earlier to an unresponding storage.
- conn.ask(Packets.AskHasLock(ttid, oid), timeout=5, queue=queue)
- return True
+ # By sending a message with a smaller timeout,
+ # the connection will be kept open.
+ conn.ask(Packets.AskHasLock(txn_context['ttid'], oid),
+ timeout=5, queue=txn_context['queue'])
@profiler_decorator
def _handleConflicts(self, txn_context, tryToResolveConflict):
Modified: trunk/neo/client/handlers/storage.py
==============================================================================
--- trunk/neo/client/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/client/handlers/storage.py [iso-8859-1] Fri May 27 18:53:27 2011
@@ -141,21 +141,32 @@ class StorageAnswersHandler(AnswerBaseHa
self.app.getHandlerData().update(object_tid_dict)
def answerHasLock(self, conn, oid, status):
+ store_msg_id = self.app.getHandlerData()['timeout_dict'].pop(oid)
if status == LockState.GRANTED_TO_OTHER:
+ # Stop expecting the timed-out store request.
+ self.app.dispatcher.forget(conn, store_msg_id)
# Object is locked by another transaction, and we have waited until
# timeout. To avoid a deadlock, abort current transaction (we might
# be locking objects the other transaction is waiting for).
raise ConflictError, 'Lock wait timeout for oid %s on %r' % (
dump(oid), conn)
- elif status == LockState.GRANTED:
- neo.lib.logging.info('Store of oid %s was successful, but after ' \
- 'timeout.', dump(oid))
- # XXX: Not sure what to do in this case yet, for now do nothing.
- else:
- # Nobody has the lock, although we asked storage to lock. This
- # means there is a software bug somewhere.
- # XXX: Not sure what to do in this case yet
- raise NotImplementedError
+ # HasLock design required that storage is multi-threaded so that
+ # it can answer to AskHasLock while processing store resquests.
+ # This means that the 2 cases (granted to us or nobody) are legitimate,
+ # either because it gave us the lock but is/was slow to store our data,
+ # or because the storage took a lot of time processing a previous
+ # store (and did not even considered our lock request).
+ # XXX: But storage nodes are still mono-threaded, so they should
+ # only answer with GRANTED_TO_OTHER (if they reply!), except
+ # maybe in very rare cases of race condition. Only log for now.
+ # This also means that most of the time, if the storage is slow
+ # to process some store requests, HasLock will timeout in turn
+ # and the connector will be closed.
+ # Anyway, it's not clear that HasLock requests are useful.
+ # Are store requests potentially long to process ? If not,
+ # we should simply raise a ConflictError on store timeout.
+ neo.lib.logging.info('Store of oid %s delayed (storage overload ?)',
+ dump(oid))
def alreadyPendingError(self, conn, message):
pass
Modified: trunk/neo/tests/client/testStorageHandler.py
==============================================================================
--- trunk/neo/tests/client/testStorageHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testStorageHandler.py [iso-8859-1] Fri May 27 18:53:27 2011
@@ -269,17 +269,6 @@ class StorageAnswerHandlerTests(NeoUnitT
oid2: [tid2, tid3],
})
- def test_answerHasLock(self):
- uuid = self.getNewUUID()
- conn = self.getFakeConnection(uuid=uuid)
- oid = self.getOID(0)
-
- self.assertRaises(ConflictError, self.handler.answerHasLock, conn, oid,
- LockState.GRANTED_TO_OTHER)
- # XXX: Just check that this doesn't raise for the moment.
- self.handler.answerHasLock(conn, oid, LockState.GRANTED)
- # TODO: Test LockState.NOT_LOCKED case when implemented.
-
if __name__ == '__main__':
unittest.main()
More information about the Neo-report
mailing list