[Neo-report] r1771 vincent - /trunk/neo/dispatcher.py
nobody at svn.erp5.org
nobody at svn.erp5.org
Tue Feb 16 12:07:42 CET 2010
Author: vincent
Date: Tue Feb 16 12:07:41 2010
New Revision: 1771
Log:
Use explicit locking.
This fixes problems uncovered by current work on askStoreObject pipelining.
Modified:
trunk/neo/dispatcher.py
Modified: trunk/neo/dispatcher.py
==============================================================================
--- trunk/neo/dispatcher.py [iso-8859-1] (original)
+++ trunk/neo/dispatcher.py [iso-8859-1] Tue Feb 16 12:07:41 2010
@@ -19,24 +19,14 @@
MARKER = []
EMPTY = {}
-# This class is thread safe:
-# - pop:
-# We don't modify outer mapping.
-# Inner mapping if accessed at most once by using a python primitive, so GIL
-# ensures atomicity.
-# - register:
-# We protect modification in outer mapping by a lock.
-# Inner mapping is accessed at most once by using a python primitive ('='
-# operator), so GIL ensures atomicity.
-# - unregister:
-# We protect modification in outer mapping by a lock.
-# Inner mapping access is done after detaching it from outer mapping in a
-# thread-safe manner, so access doesn't need to worry about concurency.
-# - registered:
-# Nothing is modified in any structure, so there is not much to worry about
-# concurency here. Note though that, by nature (read-only), this method can
-# cause concurency problems in caller, depending on how it interprets the
-# return value.
+def giant_lock(func):
+ def wrapped(self, *args, **kw):
+ self.lock_acquire()
+ try:
+ return func(self, *args, **kw)
+ finally:
+ self.lock_release()
+ return wrapped
class Dispatcher:
"""Register a packet, connection pair as expecting a response packet."""
@@ -44,9 +34,10 @@
def __init__(self):
self.message_table = {}
lock = Lock()
- self.message_table_lock_acquire = lock.acquire
- self.message_table_lock_release = lock.release
+ self.lock_acquire = lock.acquire
+ self.lock_release = lock.release
+ @giant_lock
def pop(self, conn, msg_id, default=MARKER):
"""Retrieve register-time provided payload."""
result = self.message_table.get(id(conn), EMPTY).pop(msg_id, default)
@@ -54,22 +45,19 @@
raise KeyError, (id(conn), msg_id)
return result
- def register(self, conn, msg_id, payload):
+ @giant_lock
+ def register(self, conn, msg_id, queue):
"""Register an expectation for a reply."""
- self.message_table_lock_acquire()
- try:
- message_table = self.message_table.setdefault(id(conn), {})
- finally:
- self.message_table_lock_release()
- message_table[msg_id] = payload
+ self.message_table.setdefault(id(conn), {})[msg_id] = queue
def unregister(self, conn):
""" Unregister a connection and put fake packet in queues to unlock
threads excepting responses from that connection """
- self.message_table_lock_acquire()
+ self.lock_acquire()
try:
message_table = self.message_table.pop(id(conn), EMPTY)
finally:
+ self.lock_release()
notified_set = set()
for queue in message_table.itervalues():
queue_id = id(queue)
More information about the Neo-report
mailing list