[Neo-report] r1771 vincent - /trunk/neo/dispatcher.py

nobody at svn.erp5.org nobody at svn.erp5.org
Tue Feb 16 12:07:42 CET 2010


Author: vincent
Date: Tue Feb 16 12:07:41 2010
New Revision: 1771

Log:
Use explicit locking.

This fixes problems uncovered by current work on askStoreObject pipelining.

Modified:
    trunk/neo/dispatcher.py

Modified: trunk/neo/dispatcher.py
==============================================================================
--- trunk/neo/dispatcher.py [iso-8859-1] (original)
+++ trunk/neo/dispatcher.py [iso-8859-1] Tue Feb 16 12:07:41 2010
@@ -19,24 +19,14 @@
 MARKER = []
 EMPTY = {}
 
-# This class is thread safe:
-# - pop:
-#   We don't modify outer mapping.
-#   Inner mapping if accessed at most once by using a python primitive, so GIL
-#   ensures atomicity.
-# - register:
-#   We protect modification in outer mapping by a lock.
-#   Inner mapping is accessed at most once by using a python primitive ('='
-#   operator), so GIL ensures atomicity.
-# - unregister:
-#   We protect modification in outer mapping by a lock.
-#   Inner mapping access is done after detaching it from outer mapping in a
-#   thread-safe manner, so access doesn't need to worry about concurency.
-# - registered:
-#   Nothing is modified in any structure, so there is not much to worry about
-#   concurency here. Note though that, by nature (read-only), this method can
-#   cause concurency problems in caller, depending on how it interprets the
-#   return value.
+def giant_lock(func):
+    def wrapped(self, *args, **kw):
+        self.lock_acquire()
+        try:
+            return func(self, *args, **kw)
+        finally:
+            self.lock_release()
+    return wrapped
 
 class Dispatcher:
     """Register a packet, connection pair as expecting a response packet."""
@@ -44,9 +34,10 @@
     def __init__(self):
         self.message_table = {}
         lock = Lock()
-        self.message_table_lock_acquire = lock.acquire
-        self.message_table_lock_release = lock.release
+        self.lock_acquire = lock.acquire
+        self.lock_release = lock.release
 
+    @giant_lock
     def pop(self, conn, msg_id, default=MARKER):
         """Retrieve register-time provided payload."""
         result = self.message_table.get(id(conn), EMPTY).pop(msg_id, default)
@@ -54,22 +45,19 @@
             raise KeyError, (id(conn), msg_id)
         return result
 
-    def register(self, conn, msg_id, payload):
+    @giant_lock
+    def register(self, conn, msg_id, queue):
         """Register an expectation for a reply."""
-        self.message_table_lock_acquire()
-        try:
-            message_table = self.message_table.setdefault(id(conn), {})
-        finally:
-            self.message_table_lock_release()
-        message_table[msg_id] = payload
+        self.message_table.setdefault(id(conn), {})[msg_id] = queue
 
     def unregister(self, conn):
         """ Unregister a connection and put fake packet in queues to unlock
         threads excepting responses from that connection """
-        self.message_table_lock_acquire()
+        self.lock_acquire()
         try:
             message_table = self.message_table.pop(id(conn), EMPTY)
         finally:
+            self.lock_release()
         notified_set = set()
         for queue in message_table.itervalues():
             queue_id = id(queue)





More information about the Neo-report mailing list