[Neo-report] r2643 vincent - in /trunk/neo: client/ client/handlers/ tests/client/ tests/z...

nobody at svn.erp5.org nobody at svn.erp5.org
Tue Jan 25 14:59:01 CET 2011


Author: vincent
Date: Tue Jan 25 14:59:01 2011
New Revision: 2643

Log:
Transactions must not be bound to the thread which created them.

Added:
    trunk/neo/client/container.py
Modified:
    trunk/neo/client/app.py
    trunk/neo/client/handlers/master.py
    trunk/neo/client/handlers/storage.py
    trunk/neo/client/pool.py
    trunk/neo/tests/client/testClientApp.py
    trunk/neo/tests/client/testMasterHandler.py
    trunk/neo/tests/client/testStorageHandler.py
    trunk/neo/tests/zodb/testBasic.py

Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Tue Jan 25 14:59:01 2011
@@ -15,10 +15,9 @@
 # along with this program; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 
-from thread import get_ident
 from cPickle import dumps, loads
 from zlib import compress as real_compress, decompress
-from neo.lib.locking import Queue, Empty
+from neo.lib.locking import Empty
 from random import shuffle
 import time
 import os
@@ -49,6 +48,7 @@ from neo.lib.util import u64, parseMaste
 from neo.lib.profiling import profiler_decorator, PROFILING_ENABLED
 from neo.lib.live_debug import register as registerLiveDebugger
 from neo.client.mq_index import RevisionIndex
+from neo.client.container import ThreadContainer, TransactionContainer
 
 if PROFILING_ENABLED:
     # Those functions require a "real" python function wrapper before they can
@@ -65,60 +65,6 @@ else:
     compress = real_compress
     makeChecksum = real_makeChecksum
 
-class ThreadContext(object):
-
-    def __init__(self):
-        super(ThreadContext, self).__setattr__('_threads_dict', {})
-
-    def __getThreadData(self):
-        thread_id = get_ident()
-        try:
-            result = self._threads_dict[thread_id]
-        except KeyError:
-            self.clear(thread_id)
-            result = self._threads_dict[thread_id]
-        return result
-
-    def __getattr__(self, name):
-        thread_data = self.__getThreadData()
-        try:
-            return thread_data[name]
-        except KeyError:
-            raise AttributeError, name
-
-    def __setattr__(self, name, value):
-        thread_data = self.__getThreadData()
-        thread_data[name] = value
-
-    def clear(self, thread_id=None):
-        if thread_id is None:
-            thread_id = get_ident()
-        thread_dict = self._threads_dict.get(thread_id)
-        if thread_dict is None:
-            queue = Queue(0)
-        else:
-            queue = thread_dict['queue']
-        self._threads_dict[thread_id] = {
-            'tid': None,
-            'txn': None,
-            'data_dict': {},
-            'data_list': [],
-            'object_base_serial_dict': {},
-            'object_serial_dict': {},
-            'object_stored_counter_dict': {},
-            'conflict_serial_dict': {},
-            'resolved_conflict_serial_dict': {},
-            'txn_voted': False,
-            'queue': queue,
-            'txn_info': 0,
-            'history': None,
-            'node_tids': {},
-            'asked_object': 0,
-            'undo_object_tid_dict': {},
-            'involved_nodes': set(),
-            'last_transaction': None,
-        }
-
 class Application(object):
     """The client node application."""
 
@@ -158,7 +104,8 @@ class Application(object):
         self.primary_bootstrap_handler = master.PrimaryBootstrapHandler(self)
         self.notifications_handler = master.PrimaryNotificationsHandler( self)
         # Internal attribute distinct between thread
-        self.local_var = ThreadContext()
+        self._thread_container = ThreadContainer()
+        self._txn_container = TransactionContainer()
         # Lock definition :
         # _load_lock is used to make loading and storing atomic
         lock = Lock()
@@ -185,6 +132,15 @@ class Application(object):
         self.compress = compress
         registerLiveDebugger(on_log=self.log)
 
+    def getHandlerData(self):
+        return self._thread_container.get()['answer']
+
+    def setHandlerData(self, data):
+        self._thread_container.get()['answer'] = data
+
+    def _getThreadQueue(self):
+        return self._thread_container.get()['queue']
+
     def log(self):
         self.em.log()
         self.nm.log()
@@ -222,7 +178,7 @@ class Application(object):
             conn.unlock()
 
     @profiler_decorator
-    def _waitAnyMessage(self, block=True):
+    def _waitAnyMessage(self, queue, block=True):
         """
           Handle all pending packets.
           block
@@ -230,7 +186,6 @@ class Application(object):
             received.
         """
         pending = self.dispatcher.pending
-        queue = self.local_var.queue
         get = queue.get
         _handlePacket = self._handlePacket
         while pending(queue):
@@ -247,39 +202,53 @@ class Application(object):
             except ConnectionClosed:
                 pass
 
+    def _waitAnyTransactionMessage(self, txn_context, block=True):
+        """
+        Just like _waitAnyMessage, but for per-transaction exchanges, rather
+        than per-thread.
+        """
+        queue = txn_context['queue']
+        self.setHandlerData(txn_context)
+        try:
+            self._waitAnyMessage(queue, block=block)
+        finally:
+            # Don't leave access to thread context, even if a raise happens.
+            self.setHandlerData(None)
+
     @profiler_decorator
-    def _waitMessage(self, target_conn, msg_id, handler=None):
-        """Wait for a message returned by the dispatcher in queues."""
-        get = self.local_var.queue.get
+    def _ask(self, conn, packet, handler=None):
+        self.setHandlerData(None)
+        queue = self._getThreadQueue()
+        msg_id = conn.ask(packet, queue=queue)
+        get = queue.get
         _handlePacket = self._handlePacket
         while True:
-            conn, packet = get(True)
-            is_forgotten = isinstance(packet, ForgottenPacket)
-            if target_conn is conn:
+            qconn, qpacket = get(True)
+            is_forgotten = isinstance(qpacket, ForgottenPacket)
+            if conn is qconn:
                 # check fake packet
-                if packet is None:
+                if qpacket is None:
                     raise ConnectionClosed
-                if msg_id == packet.getId():
+                if msg_id == qpacket.getId():
                     if is_forgotten:
                         raise ValueError, 'ForgottenPacket for an ' \
                             'explicitely expected packet.'
-                    _handlePacket(conn, packet, handler=handler)
+                    _handlePacket(qconn, qpacket, handler=handler)
                     break
-            if not is_forgotten and packet is not None:
-                _handlePacket(conn, packet)
+            if not is_forgotten and qpacket is not None:
+                _handlePacket(qconn, qpacket)
+        return self.getHandlerData()
 
     @profiler_decorator
     def _askStorage(self, conn, packet):
         """ Send a request to a storage node and process its answer """
-        msg_id = conn.ask(packet, queue=self.local_var.queue)
-        self._waitMessage(conn, msg_id, self.storage_handler)
+        return self._ask(conn, packet, handler=self.storage_handler)
 
     @profiler_decorator
     def _askPrimary(self, packet):
         """ Send a request to the primary master and process its answer """
-        conn = self._getMasterConnection()
-        msg_id = conn.ask(packet, queue=self.local_var.queue)
-        self._waitMessage(conn, msg_id, self.primary_handler)
+        return self._ask(self._getMasterConnection(), packet,
+            handler=self.primary_handler)
 
     @profiler_decorator
     def _getMasterConnection(self):
@@ -311,7 +280,6 @@ class Application(object):
         neo.lib.logging.debug('connecting to primary master...')
         ready = False
         nm = self.nm
-        queue = self.local_var.queue
         packet = Packets.AskPrimary()
         while not ready:
             # Get network connection to primary master
@@ -346,9 +314,8 @@ class Application(object):
                                   self.trying_master_node)
                     continue
                 try:
-                    msg_id = conn.ask(packet, queue=queue)
-                    self._waitMessage(conn, msg_id,
-                            handler=self.primary_bootstrap_handler)
+                    self._ask(conn, packet,
+                        handler=self.primary_bootstrap_handler)
                 except ConnectionClosed:
                     continue
                 # If we reached the primary master node, mark as connected
@@ -373,24 +340,19 @@ class Application(object):
             looked-up again.
         """
         neo.lib.logging.info('Initializing from master')
-        queue = self.local_var.queue
+        ask = self._ask
+        handler = self.primary_bootstrap_handler
         # Identify to primary master and request initial data
         p = Packets.RequestIdentification(NodeTypes.CLIENT, self.uuid, None,
             self.name)
         while conn.getUUID() is None:
-            self._waitMessage(conn, conn.ask(p, queue=queue),
-                    handler=self.primary_bootstrap_handler)
+            ask(conn, p, handler=handler)
             if conn.getUUID() is None:
                 # Node identification was refused by master, it is considered
                 # as the primary as long as we are connected to it.
                 time.sleep(1)
-        if self.uuid is not None:
-            msg_id = conn.ask(Packets.AskNodeInformation(), queue=queue)
-            self._waitMessage(conn, msg_id,
-                    handler=self.primary_bootstrap_handler)
-            msg_id = conn.ask(Packets.AskPartitionTable(), queue=queue)
-            self._waitMessage(conn, msg_id,
-                    handler=self.primary_bootstrap_handler)
+        ask(conn, Packets.AskNodeInformation(), handler=handler)
+        ask(conn, Packets.AskPartitionTable(), handler=handler)
         return self.pt.operational()
 
     def registerDB(self, db, limit):
@@ -490,32 +452,27 @@ class Application(object):
 
     @profiler_decorator
     def _loadFromStorage(self, oid, at_tid, before_tid):
-        self.local_var.asked_object = 0
+        data = None
         packet = Packets.AskObject(oid, at_tid, before_tid)
         for node, conn in self.cp.iterateForObject(oid, readable=True):
             try:
-                self._askStorage(conn, packet)
+                noid, tid, next_tid, compression, checksum, data \
+                    = self._askStorage(conn, packet)
             except ConnectionClosed:
                 continue
 
-            # Check data
-            noid, tid, next_tid, compression, checksum, data \
-                = self.local_var.asked_object
-            if noid != oid:
-                # Oops, try with next node
-                neo.lib.logging.error('got wrong oid %s instead of %s from %s',
-                    noid, dump(oid), conn)
-                self.local_var.asked_object = -1
-                continue
-            elif checksum != makeChecksum(data):
+            if checksum != makeChecksum(data):
+                # Warning: see TODO file.
                 # Check checksum.
                 neo.lib.logging.error('wrong checksum from %s for oid %s',
                               conn, dump(oid))
-                self.local_var.asked_object = -1
+                data = None
                 continue
             break
-        if self.local_var.asked_object == -1:
-            raise NEOStorageError('inconsistent data')
+        if data is None:
+            # We didn't got any object from all storage node because of
+            # connection error
+            raise NEOStorageError('connection failure')
 
         # Uncompress data
         if compression:
@@ -547,30 +504,34 @@ class Application(object):
     @profiler_decorator
     def tpc_begin(self, transaction, tid=None, status=' '):
         """Begin a new transaction."""
+        txn_container = self._txn_container
         # First get a transaction, only one is allowed at a time
-        if self.local_var.txn is transaction:
+        if txn_container.get(transaction) is not None:
             # We already begin the same transaction
             raise StorageTransactionError('Duplicate tpc_begin calls')
-        if self.local_var.txn is not None:
-            raise NeoException, 'local_var is not clean in tpc_begin'
+        txn_context = txn_container.new(transaction)
         # use the given TID or request a new one to the master
-        self._askPrimary(Packets.AskBeginTransaction(tid))
-        if self.local_var.tid is None:
+        answer_ttid = self._askPrimary(Packets.AskBeginTransaction(tid))
+        if answer_ttid is None:
             raise NEOStorageError('tpc_begin failed')
-        assert tid in (None, self.local_var.tid), (tid, self.local_var.tid)
-        self.local_var.txn = transaction
+        assert tid in (None, answer_ttid), (tid, answer_ttid)
+        txn_context['txn'] = transaction
+        txn_context['ttid'] = answer_ttid
 
     @profiler_decorator
     def store(self, oid, serial, data, version, transaction):
         """Store object."""
-        if transaction is not self.local_var.txn:
+        txn_context = self._txn_container.get(transaction)
+        if txn_context is None:
             raise StorageTransactionError(self, transaction)
         neo.lib.logging.debug(
                         'storing oid %s serial %s', dump(oid), dump(serial))
-        self._store(oid, serial, data)
+        self._store(txn_context, oid, serial, data)
         return None
 
-    def _store(self, oid, serial, data, data_serial=None, unlock=False):
+    def _store(self, txn_context, oid, serial, data, data_serial=None,
+            unlock=False):
+        ttid = txn_context['ttid']
         if data is None:
             # This is some undo: either a no-data object (undoing object
             # creation) or a back-pointer to an earlier revision (going back to
@@ -589,33 +550,33 @@ class Application(object):
                 else:
                     compression = 1
         checksum = makeChecksum(compressed_data)
-        on_timeout = OnTimeout(self.onStoreTimeout, self.local_var.tid, oid)
+        on_timeout = OnTimeout(self.onStoreTimeout, ttid, oid)
         # Store object in tmp cache
-        local_var = self.local_var
-        data_dict = local_var.data_dict
+        data_dict = txn_context['data_dict']
         if oid not in data_dict:
-            local_var.data_list.append(oid)
+            txn_context['data_list'].append(oid)
         data_dict[oid] = data
         # Store data on each node
-        self.local_var.object_stored_counter_dict[oid] = {}
-        object_base_serial_dict = local_var.object_base_serial_dict
+        txn_context['object_stored_counter_dict'][oid] = {}
+        object_base_serial_dict = txn_context['object_base_serial_dict']
         if oid not in object_base_serial_dict:
             object_base_serial_dict[oid] = serial
-        self.local_var.object_serial_dict[oid] = serial
-        queue = self.local_var.queue
-        add_involved_nodes = self.local_var.involved_nodes.add
+        txn_context['object_serial_dict'][oid] = serial
+        queue = txn_context['queue']
+        involved_nodes = txn_context['involved_nodes']
+        add_involved_nodes = involved_nodes.add
         packet = Packets.AskStoreObject(oid, serial, compression,
-            checksum, compressed_data, data_serial, self.local_var.tid, unlock)
+            checksum, compressed_data, data_serial, ttid, unlock)
         for node, conn in self.cp.iterateForObject(oid, writable=True):
             try:
                 conn.ask(packet, on_timeout=on_timeout, queue=queue)
                 add_involved_nodes(node)
             except ConnectionClosed:
                 continue
-        if not self.local_var.involved_nodes:
+        if not involved_nodes:
             raise NEOStorageError("Store failed")
 
-        self._waitAnyMessage(False)
+        self._waitAnyTransactionMessage(txn_context, False)
 
     def onStoreTimeout(self, conn, msg_id, ttid, oid):
         # NOTE: this method is called from poll thread, don't use
@@ -628,17 +589,17 @@ class Application(object):
         return True
 
     @profiler_decorator
-    def _handleConflicts(self, tryToResolveConflict):
+    def _handleConflicts(self, txn_context, tryToResolveConflict):
         result = []
         append = result.append
-        local_var = self.local_var
         # Check for conflicts
-        data_dict = local_var.data_dict
-        object_base_serial_dict = local_var.object_base_serial_dict
-        object_serial_dict = local_var.object_serial_dict
-        conflict_serial_dict = local_var.conflict_serial_dict.copy()
-        local_var.conflict_serial_dict.clear()
-        resolved_conflict_serial_dict = local_var.resolved_conflict_serial_dict
+        data_dict = txn_context['data_dict']
+        object_base_serial_dict = txn_context['object_base_serial_dict']
+        object_serial_dict = txn_context['object_serial_dict']
+        conflict_serial_dict = txn_context['conflict_serial_dict'].copy()
+        txn_context['conflict_serial_dict'].clear()
+        resolved_conflict_serial_dict = txn_context[
+            'resolved_conflict_serial_dict']
         for oid, conflict_serial_set in conflict_serial_dict.iteritems():
             resolved_serial_set = resolved_conflict_serial_dict.setdefault(
                 oid, set())
@@ -650,7 +611,6 @@ class Application(object):
                 continue
             serial = object_serial_dict[oid]
             data = data_dict[oid]
-            tid = local_var.tid
             resolved = False
             if conflict_serial == ZERO_TID:
                 # Storage refused us from taking object lock, to avoid a
@@ -665,12 +625,11 @@ class Application(object):
                 # object data again.
                 neo.lib.logging.info('Deadlock avoidance triggered on %r:%r',
                     dump(oid), dump(serial))
-                for store_oid, store_data in \
-                        local_var.data_dict.iteritems():
+                for store_oid, store_data in data_dict.iteritems():
                     store_serial = object_serial_dict[store_oid]
                     if store_data is None:
-                        self.checkCurrentSerialInTransaction(store_oid,
-                            store_serial)
+                        self._checkCurrentSerialInTransaction(txn_context,
+                            store_oid, store_serial)
                     else:
                         if store_data is '':
                             # Some undo
@@ -678,8 +637,8 @@ class Application(object):
                                 ' reliably work with undo, this must be '
                                 'implemented.')
                             break
-                        self._store(store_oid, store_serial, store_data,
-                            unlock=True)
+                        self._store(txn_context, store_oid, store_serial,
+                            store_data, unlock=True)
                 else:
                     resolved = True
             elif data is not None:
@@ -694,7 +653,7 @@ class Application(object):
                     # Base serial changes too, as we resolved a conflict
                     object_base_serial_dict[oid] = conflict_serial
                     # Try to store again
-                    self._store(oid, conflict_serial, new_data)
+                    self._store(txn_context, oid, conflict_serial, new_data)
                     append(oid)
                     resolved = True
                 else:
@@ -704,49 +663,51 @@ class Application(object):
             if not resolved:
                 # XXX: Is it really required to remove from data_dict ?
                 del data_dict[oid]
-                local_var.data_list.remove(oid)
+                txn_context['data_list'].remove(oid)
                 if data is None:
                     exc = ReadConflictError(oid=oid, serials=(conflict_serial,
                         serial))
                 else:
-                    exc = ConflictError(oid=oid, serials=(tid, serial),
-                        data=data)
+                    exc = ConflictError(oid=oid, serials=(txn_context['ttid'],
+                        serial), data=data)
                 raise exc
         return result
 
     @profiler_decorator
-    def waitResponses(self):
+    def waitResponses(self, queue, handler_data):
         """Wait for all requests to be answered (or their connection to be
         detected as closed)"""
-        queue = self.local_var.queue
         pending = self.dispatcher.pending
         _waitAnyMessage = self._waitAnyMessage
+        self.setHandlerData(handler_data)
         while pending(queue):
-            _waitAnyMessage()
+            _waitAnyMessage(queue)
 
     @profiler_decorator
-    def waitStoreResponses(self, tryToResolveConflict):
+    def waitStoreResponses(self, txn_context, tryToResolveConflict):
         result = []
         append = result.append
         resolved_oid_set = set()
         update = resolved_oid_set.update
-        local_var = self.local_var
-        tid = local_var.tid
+        ttid = txn_context['ttid']
         _handleConflicts = self._handleConflicts
-        conflict_serial_dict = local_var.conflict_serial_dict
-        queue = local_var.queue
+        queue = txn_context['queue']
+        conflict_serial_dict = txn_context['conflict_serial_dict']
         pending = self.dispatcher.pending
-        _waitAnyMessage = self._waitAnyMessage
+        _waitAnyTransactionMessage = self._waitAnyTransactionMessage
         while pending(queue) or conflict_serial_dict:
-            _waitAnyMessage()
+            # Note: handler data can be overwritten by _handleConflicts
+            # so we must set it for each iteration.
+            _waitAnyTransactionMessage(txn_context)
             if conflict_serial_dict:
-                conflicts = _handleConflicts(tryToResolveConflict)
+                conflicts = _handleConflicts(txn_context,
+                    tryToResolveConflict)
                 if conflicts:
                     update(conflicts)
 
         # Check for never-stored objects, and update result for all others
         for oid, store_dict in \
-            local_var.object_stored_counter_dict.iteritems():
+                txn_context['object_stored_counter_dict'].iteritems():
             if not store_dict:
                 neo.lib.logging.error('tpc_store failed')
                 raise NEOStorageError('tpc_store failed')
@@ -757,27 +718,27 @@ class Application(object):
     @profiler_decorator
     def tpc_vote(self, transaction, tryToResolveConflict):
         """Store current transaction."""
-        local_var = self.local_var
-        if transaction is not local_var.txn:
+        txn_context = self._txn_container.get(transaction)
+        if txn_context is None or transaction is not txn_context['txn']:
             raise StorageTransactionError(self, transaction)
 
-        result = self.waitStoreResponses(tryToResolveConflict)
+        result = self.waitStoreResponses(txn_context, tryToResolveConflict)
 
-        tid = local_var.tid
+        ttid = txn_context['ttid']
         # Store data on each node
         txn_stored_counter = 0
-        packet = Packets.AskStoreTransaction(tid, str(transaction.user),
+        packet = Packets.AskStoreTransaction(ttid, str(transaction.user),
             str(transaction.description), dumps(transaction._extension),
-            local_var.data_list)
-        add_involved_nodes = self.local_var.involved_nodes.add
-        for node, conn in self.cp.iterateForObject(tid, writable=True):
-            neo.lib.logging.debug("voting object %s on %s", dump(tid),
+            txn_context['data_list'])
+        add_involved_nodes = txn_context['involved_nodes'].add
+        for node, conn in self.cp.iterateForObject(ttid, writable=True):
+            neo.lib.logging.debug("voting object %s on %s", dump(ttid),
                 dump(conn.getUUID()))
             try:
                 self._askStorage(conn, packet)
-                add_involved_nodes(node)
             except ConnectionClosed:
                 continue
+            add_involved_nodes(node)
             txn_stored_counter += 1
 
         # check at least one storage node accepted
@@ -790,20 +751,22 @@ class Application(object):
         # tpc_finish.
         self._getMasterConnection()
 
-        local_var.txn_voted = True
+        txn_context['txn_voted'] = True
         return result
 
     @profiler_decorator
     def tpc_abort(self, transaction):
         """Abort current transaction."""
-        if transaction is not self.local_var.txn:
+        txn_container = self._txn_container
+        txn_context = txn_container.get(transaction)
+        if txn_context is None:
             return
 
-        tid = self.local_var.tid
-        p = Packets.AbortTransaction(tid)
+        ttid = txn_context['ttid']
+        p = Packets.AbortTransaction(ttid)
         getConnForNode = self.cp.getConnForNode
         # cancel transaction one all those nodes
-        for node in self.local_var.involved_nodes:
+        for node in txn_context['involved_nodes']:
             conn = getConnForNode(node)
             if conn is None:
                 continue
@@ -815,28 +778,30 @@ class Application(object):
                     'storage node %r of abortion, ignoring.',
                     conn, exc_info=1)
         self._getMasterConnection().notify(p)
-        queue = self.local_var.queue
-        self.dispatcher.forget_queue(queue)
-        self.local_var.clear()
+        queue = txn_context['queue']
+        # We don't need to flush queue, as it won't be reused by future
+        # transactions (deleted on next line & indexed by transaction object
+        # instance).
+        self.dispatcher.forget_queue(queue, flush_queue=False)
+        txn_container.delete(transaction)
 
     @profiler_decorator
     def tpc_finish(self, transaction, tryToResolveConflict, f=None):
         """Finish current transaction."""
-        local_var = self.local_var
-        if local_var.txn is not transaction:
+        txn_container = self._txn_container
+        txn_context = txn_container.get(transaction)
+        if txn_context is None:
             raise StorageTransactionError('tpc_finish called for wrong '
                 'transaction')
-        if not local_var.txn_voted:
+        if not txn_context['txn_voted']:
             self.tpc_vote(transaction, tryToResolveConflict)
         self._load_lock_acquire()
         try:
             # Call finish on master
-            oid_list = local_var.data_list
-            p = Packets.AskFinishTransaction(local_var.tid, oid_list)
-            self._askPrimary(p)
+            oid_list = txn_context['data_list']
+            p = Packets.AskFinishTransaction(txn_context['ttid'], oid_list)
+            tid = self._askPrimary(p)
 
-            # From now on, self.local_var.tid holds the "real" TID.
-            tid = local_var.tid
             # Call function given by ZODB
             if f is not None:
                 f(tid)
@@ -851,8 +816,8 @@ class Application(object):
                     assert next_tid is None, (dump(oid), dump(base_tid),
                         dump(next_tid))
                     return (data, tid)
-                get_baseTID = local_var.object_base_serial_dict.get
-                for oid, data in local_var.data_dict.iteritems():
+                get_baseTID = txn_context['object_base_serial_dict'].get
+                for oid, data in txn_context['data_dict'].iteritems():
                     if data is None:
                         # this is just a remain of
                         # checkCurrentSerialInTransaction call, ignore (no data
@@ -871,13 +836,14 @@ class Application(object):
                         mq_cache[(oid, tid)] = (data, None)
             finally:
                 self._cache_lock_release()
-            local_var.clear()
+            txn_container.delete(transaction)
             return tid
         finally:
             self._load_lock_release()
 
     def undo(self, snapshot_tid, undone_tid, txn, tryToResolveConflict):
-        if txn is not self.local_var.txn:
+        txn_context = self._txn_container.get(txn)
+        if txn_context is None:
             raise StorageTransactionError(self, undone_tid)
 
         txn_info, txn_ext = self._getTransactionInformation(undone_tid)
@@ -899,22 +865,23 @@ class Application(object):
         getCellList = pt.getCellList
         getCellSortKey = self.cp.getCellSortKey
         getConnForCell = self.cp.getConnForCell
-        queue = self.local_var.queue
-        undo_object_tid_dict = self.local_var.undo_object_tid_dict = {}
+        queue = self._getThreadQueue()
+        ttid = txn_context['ttid']
         for partition, oid_list in partition_oid_dict.iteritems():
             cell_list = getCellList(partition, readable=True)
             shuffle(cell_list)
             cell_list.sort(key=getCellSortKey)
             storage_conn = getConnForCell(cell_list[0])
-            storage_conn.ask(Packets.AskObjectUndoSerial(self.local_var.tid,
+            storage_conn.ask(Packets.AskObjectUndoSerial(ttid,
                 snapshot_tid, undone_tid, oid_list), queue=queue)
 
         # Wait for all AnswerObjectUndoSerial. We might get OidNotFoundError,
         # meaning that objects in transaction's oid_list do not exist any
         # longer. This is the symptom of a pack, so forbid undoing transaction
         # when it happens.
+        undo_object_tid_dict = {}
         try:
-            self.waitResponses()
+            self.waitResponses(queue, undo_object_tid_dict)
         except NEOStorageNotFoundError:
             self.dispatcher.forget_queue(queue)
             raise UndoError('non-undoable transaction')
@@ -929,9 +896,11 @@ class Application(object):
                 # object. This is an undo conflict, try to resolve it.
                 try:
                     # Load the latest version we are supposed to see
-                    data = self.load(snapshot_tid, oid, serial=current_serial)[0]
+                    data = self.load(snapshot_tid, oid,
+                        serial=current_serial)[0]
                     # Load the version we were undoing to
-                    undo_data = self.load(snapshot_tid, oid, serial=undo_serial)[0]
+                    undo_data = self.load(snapshot_tid, oid,
+                        serial=undo_serial)[0]
                 except NEOStorageNotFoundError:
                     raise UndoError('Object not found while resolving undo '
                         'conflict')
@@ -945,7 +914,7 @@ class Application(object):
                     raise UndoError('Some data were modified by a later ' \
                         'transaction', oid)
                 undo_serial = None
-            self._store(oid, current_serial, data, undo_serial)
+            self._store(txn_context, oid, current_serial, data, undo_serial)
 
     def _insertMetadata(self, txn_info, extension):
         for k, v in loads(extension).items():
@@ -955,7 +924,7 @@ class Application(object):
         packet = Packets.AskTransactionInformation(tid)
         for node, conn in self.cp.iterateForObject(tid, readable=True):
             try:
-                self._askStorage(conn, packet)
+                txn_info, txn_ext = self._askStorage(conn, packet)
             except ConnectionClosed:
                 continue
             except NEOStorageNotFoundError:
@@ -964,7 +933,7 @@ class Application(object):
             break
         else:
             raise NEOStorageError('Transaction %r not found' % (tid, ))
-        return (self.local_var.txn_info, self.local_var.txn_ext)
+        return (txn_info, txn_ext)
 
     def undoLog(self, first, last, filter=None, block=0):
         # XXX: undoLog is broken
@@ -978,8 +947,7 @@ class Application(object):
         pt = self.getPartitionTable()
         storage_node_list = pt.getNodeList()
 
-        self.local_var.node_tids = {}
-        queue = self.local_var.queue
+        queue = self._getThreadQueue()
         packet = Packets.AskTIDs(first, last, INVALID_PARTITION)
         for storage_node in storage_node_list:
             conn = self.cp.getConnForNode(storage_node)
@@ -988,15 +956,11 @@ class Application(object):
             conn.ask(packet, queue=queue)
 
         # Wait for answers from all storages.
-        self.waitResponses()
+        tid_set = set()
+        self.waitResponses(queue, tid_set)
 
         # Reorder tids
-        ordered_tids = set()
-        update = ordered_tids.update
-        for tid_list in self.local_var.node_tids.itervalues():
-            update(tid_list)
-        ordered_tids = list(ordered_tids)
-        ordered_tids.sort(reverse=True)
+        ordered_tids = sorted(tid_set, reverse=True)
         neo.lib.logging.debug(
                         "UndoLog tids %s", [dump(x) for x in ordered_tids])
         # For each transaction, get info
@@ -1004,11 +968,10 @@ class Application(object):
         append = undo_info.append
         for tid in ordered_tids:
             (txn_info, txn_ext) = self._getTransactionInformation(tid)
-            if filter is None or filter(self.local_var.txn_info):
-                txn_info = self.local_var.txn_info
+            if filter is None or filter(txn_info):
                 txn_info.pop('packed')
                 txn_info.pop("oids")
-                self._insertMetadata(txn_info, self.local_var.txn_ext)
+                self._insertMetadata(txn_info, txn_ext)
                 append(txn_info)
                 if len(undo_info) >= last - first:
                     break
@@ -1024,9 +987,8 @@ class Application(object):
         node_list = node_map.keys()
         node_list.sort(key=self.cp.getCellSortKey)
         partition_set = set(range(self.pt.getPartitions()))
-        queue = self.local_var.queue
+        queue = self._getThreadQueue()
         # request a tid list for each partition
-        self.local_var.tids_from = set()
         for node in node_list:
             conn = self.cp.getConnForNode(node)
             request_set = set(node_map[node]) & partition_set
@@ -1038,40 +1000,34 @@ class Application(object):
             if not partition_set:
                 break
         assert not partition_set
-        self.waitResponses()
+        tid_set = set()
+        self.waitResponses(queue, tid_set)
         # request transactions informations
         txn_list = []
         append = txn_list.append
         tid = None
-        for tid in sorted(self.local_var.tids_from):
+        for tid in sorted(tid_set):
             (txn_info, txn_ext) = self._getTransactionInformation(tid)
-            txn_info['ext'] = loads(self.local_var.txn_ext)
+            txn_info['ext'] = loads(txn_ext)
             append(txn_info)
         return (tid, txn_list)
 
     def history(self, oid, version=None, size=1, filter=None):
+        queue = self._getThreadQueue()
         # Get history informations for object first
         packet = Packets.AskObjectHistory(oid, 0, size)
         for node, conn in self.cp.iterateForObject(oid, readable=True):
-            # FIXME: we keep overwriting self.local_var.history here, we
-            # should aggregate it instead.
-            self.local_var.history = None
             try:
-                self._askStorage(conn, packet)
+                conn.ask(packet, queue=queue)
             except ConnectionClosed:
                 continue
-
-            if self.local_var.history[0] != oid:
-                # Got history for wrong oid
-                raise NEOStorageError('inconsistency in storage: asked oid ' \
-                      '%r, got %r' % (oid, self.local_var.history[0]))
-
-        if not isinstance(self.local_var.history, tuple):
-            raise NEOStorageError('history failed')
-
+        history_dict = {}
+        self.waitResponses(queue, history_dict)
         # Now that we have object informations, get txn informations
         history_list = []
-        for serial, size in self.local_var.history[1]:
+        append = history_list.append
+        for serial in sorted(history_dict.keys(), reverse=True):
+            size = history_dict[serial]
             txn_info, txn_ext = self._getTransactionInformation(serial)
             # create history dict
             txn_info.pop('id')
@@ -1081,9 +1037,8 @@ class Application(object):
             txn_info['version'] = ''
             txn_info['size'] = size
             if filter is None or filter(txn_info):
-                history_list.append(txn_info)
+                append(txn_info)
             self._insertMetadata(txn_info, txn_ext)
-
         return history_list
 
     @profiler_decorator
@@ -1111,16 +1066,15 @@ class Application(object):
         return Iterator(self, start, stop)
 
     def lastTransaction(self):
-        self._askPrimary(Packets.AskLastTransaction())
-        return self.local_var.last_transaction
+        return self._askPrimary(Packets.AskLastTransaction())
 
     def abortVersion(self, src, transaction):
-        if transaction is not self.local_var.txn:
+        if self._txn_container.get(transaction) is None:
             raise StorageTransactionError(self, transaction)
         return '', []
 
     def commitVersion(self, src, dest, transaction):
-        if transaction is not self.local_var.txn:
+        if self._txn_container.get(transaction) is None:
             raise StorageTransactionError(self, transaction)
         return '', []
 
@@ -1141,12 +1095,6 @@ class Application(object):
     def invalidationBarrier(self):
         self._askPrimary(Packets.AskBarrier())
 
-    def setTID(self, value):
-        self.local_var.tid = value
-
-    def getTID(self):
-        return self.local_var.tid
-
     def pack(self, t):
         tid = repr(TimeStamp(*time.gmtime(t)[:5] + (t % 60, )))
         if tid == ZERO_TID:
@@ -1166,24 +1114,27 @@ class Application(object):
         return self.load(None, oid)[1]
 
     def checkCurrentSerialInTransaction(self, oid, serial, transaction):
-        local_var = self.local_var
-        if transaction is not local_var.txn:
+        txn_context = self._txn_container.get(transaction)
+        if txn_context is None:
               raise StorageTransactionError(self, transaction)
-        local_var.object_serial_dict[oid] = serial
+        self._checkCurrentSerialInTransaction(txn_context, oid, serial)
+
+    def _checkCurrentSerialInTransaction(self, txn_context, oid, serial):
+        ttid = txn_context['ttid']
+        txn_context['object_serial_dict'][oid] = serial
         # Placeholders
-        queue = local_var.queue
-        local_var.object_stored_counter_dict[oid] = {}
-        data_dict = local_var.data_dict
+        queue = txn_context['queue']
+        txn_context['object_stored_counter_dict'][oid] = {}
+        data_dict = txn_context['data_dict']
         if oid not in data_dict:
             # Marker value so we don't try to resolve conflicts.
             data_dict[oid] = None
-            local_var.data_list.append(oid)
-        packet = Packets.AskCheckCurrentSerial(local_var.tid, serial, oid)
+            txn_context['data_list'].append(oid)
+        packet = Packets.AskCheckCurrentSerial(ttid, serial, oid)
         for node, conn in self.cp.iterateForObject(oid, writable=True):
             try:
                 conn.ask(packet, queue=queue)
             except ConnectionClosed:
                 continue
-
-        self._waitAnyMessage(False)
+        self._waitAnyTransactionMessage(txn_context, False)
 

Added: trunk/neo/client/container.py
==============================================================================
--- trunk/neo/client/container.py (added)
+++ trunk/neo/client/container.py [iso-8859-1] Tue Jan 25 14:59:01 2011
@@ -0,0 +1,82 @@
+#
+# Copyright (C) 2011  Nexedi SA
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+
+from thread import get_ident
+from neo.lib.locking import Queue
+
+class ContainerBase(object):
+    def __init__(self):
+        self._context_dict = {}
+
+    def _getID(self, *args, **kw):
+        raise NotImplementedError
+
+    def _new(self, *args, **kw):
+        raise NotImplementedError
+
+    def delete(self, *args, **kw):
+        del self._context_dict[self._getID(*args, **kw)]
+
+    def get(self, *args, **kw):
+        return self._context_dict.get(self._getID(*args, **kw))
+
+    def new(self, *args, **kw):
+        result = self._context_dict[self._getID(*args, **kw)] = self._new(
+            *args, **kw)
+        return result
+
+class ThreadContainer(ContainerBase):
+    def _getID(self):
+        return get_ident()
+
+    def _new(self):
+        return {
+            'queue': Queue(0),
+            'answer': None,
+        }
+
+    def get(self):
+        """
+        Implicitely create a thread context if it doesn't exist.
+        """
+        my_id = self._getID()
+        try:
+            result = self._context_dict[my_id]
+        except KeyError:
+            result = self._context_dict[my_id] = self._new()
+        return result
+
+class TransactionContainer(ContainerBase):
+    def _getID(self, txn):
+        return id(txn)
+
+    def _new(self, txn):
+        return {
+            'queue': Queue(0),
+            'txn': txn,
+            'ttid': None,
+            'data_dict': {},
+            'data_list': [],
+            'object_base_serial_dict': {},
+            'object_serial_dict': {},
+            'object_stored_counter_dict': {},
+            'conflict_serial_dict': {},
+            'resolved_conflict_serial_dict': {},
+            'txn_voted': False,
+            'involved_nodes': set(),
+        }
+

Modified: trunk/neo/client/handlers/master.py
==============================================================================
--- trunk/neo/client/handlers/master.py [iso-8859-1] (original)
+++ trunk/neo/client/handlers/master.py [iso-8859-1] Tue Jan 25 14:59:01 2011
@@ -156,21 +156,19 @@ class PrimaryNotificationsHandler(BaseHa
 class PrimaryAnswersHandler(AnswerBaseHandler):
     """ Handle that process expected packets from the primary master """
 
-    def answerBeginTransaction(self, conn, tid):
-        self.app.setTID(tid)
+    def answerBeginTransaction(self, conn, ttid):
+        self.app.setHandlerData(ttid)
 
     def answerNewOIDs(self, conn, oid_list):
         self.app.new_oid_list = oid_list
 
-    def answerTransactionFinished(self, conn, ttid, tid):
-        if ttid != self.app.getTID():
-            raise NEOStorageError('Wrong TID, transaction not started')
-        self.app.setTID(tid)
+    def answerTransactionFinished(self, conn, _, tid):
+        self.app.setHandlerData(tid)
 
     def answerPack(self, conn, status):
         if not status:
             raise NEOStorageError('Already packing')
 
     def answerLastTransaction(self, conn, ltid):
-        self.app.local_var.last_transaction = ltid
+        self.app.setHandlerData(ltid)
 

Modified: trunk/neo/client/handlers/storage.py
==============================================================================
--- trunk/neo/client/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/client/handlers/storage.py [iso-8859-1] Tue Jan 25 14:59:01 2011
@@ -68,23 +68,25 @@ class StorageAnswersHandler(AnswerBaseHa
         if data_serial is not None:
             raise NEOStorageError, 'Storage should never send non-None ' \
                 'data_serial to clients, got %s' % (dump(data_serial), )
-        self.app.local_var.asked_object = (oid, start_serial, end_serial,
-                compression, checksum, data)
+        self.app.setHandlerData((oid, start_serial, end_serial,
+                compression, checksum, data))
 
     def answerStoreObject(self, conn, conflicting, oid, serial):
-        local_var = self.app.local_var
-        object_stored_counter_dict = local_var.object_stored_counter_dict[oid]
+        txn_context = self.app.getHandlerData()
+        object_stored_counter_dict = txn_context[
+            'object_stored_counter_dict'][oid]
         if conflicting:
             neo.lib.logging.info('%r report a conflict for %r with %r', conn,
                         dump(oid), dump(serial))
-            conflict_serial_dict = local_var.conflict_serial_dict
+            conflict_serial_dict = txn_context['conflict_serial_dict']
             if serial in object_stored_counter_dict:
                 raise NEOStorageError, 'A storage accepted object for ' \
                     'serial %s but another reports a conflict for it.' % (
                         dump(serial), )
             # If this conflict is not already resolved, mark it for
             # resolution.
-            if serial not in local_var.resolved_conflict_serial_dict.get(oid, ()):
+            if serial not in txn_context[
+                    'resolved_conflict_serial_dict'].get(oid, ()):
                 conflict_serial_dict.setdefault(oid, set()).add(serial)
         else:
             object_stored_counter_dict[serial] = \
@@ -92,31 +94,29 @@ class StorageAnswersHandler(AnswerBaseHa
 
     answerCheckCurrentSerial = answerStoreObject
 
-    def answerStoreTransaction(self, conn, tid):
-        if tid != self.app.getTID():
-            raise NEOStorageError('Wrong TID, transaction not started')
+    def answerStoreTransaction(self, conn, _):
+        pass
 
     def answerTIDsFrom(self, conn, tid_list):
         neo.lib.logging.debug('Get %d TIDs from %r', len(tid_list), conn)
-        assert not self.app.local_var.tids_from.intersection(set(tid_list))
-        self.app.local_var.tids_from.update(tid_list)
+        tids_from = self.app.getHandlerData()
+        assert not tids_from.intersection(set(tid_list))
+        tids_from.update(tid_list)
 
     def answerTransactionInformation(self, conn, tid,
                                            user, desc, ext, packed, oid_list):
-        # transaction information are returned as a dict
-        info = {}
-        info['time'] = TimeStamp(tid).timeTime()
-        info['user_name'] = user
-        info['description'] = desc
-        info['id'] = tid
-        info['oids'] = oid_list
-        info['packed'] = packed
-        self.app.local_var.txn_ext = ext
-        self.app.local_var.txn_info = info
+        self.app.setHandlerData(({
+            'time': TimeStamp(tid).timeTime(),
+            'user_name': user,
+            'description': desc,
+            'id': tid,
+            'oids': oid_list,
+            'packed': packed,
+        }, ext))
 
-    def answerObjectHistory(self, conn, oid, history_list):
+    def answerObjectHistory(self, conn, _, history_list):
         # history_list is a list of tuple (serial, size)
-        self.app.local_var.history = oid, history_list
+        self.app.getHandlerData().update(history_list)
 
     def oidNotFound(self, conn, message):
         # This can happen either when :
@@ -132,10 +132,10 @@ class StorageAnswersHandler(AnswerBaseHa
         raise NEOStorageNotFoundError(message)
 
     def answerTIDs(self, conn, tid_list):
-        self.app.local_var.node_tids[conn.getUUID()] = tid_list
+        self.app.getHandlerData().update(tid_list)
 
     def answerObjectUndoSerial(self, conn, object_tid_dict):
-        self.app.local_var.undo_object_tid_dict.update(object_tid_dict)
+        self.app.getHandlerData().update(object_tid_dict)
 
     def answerHasLock(self, conn, oid, status):
         if status == LockState.GRANTED_TO_OTHER:

Modified: trunk/neo/client/pool.py
==============================================================================
--- trunk/neo/client/pool.py [iso-8859-1] (original)
+++ trunk/neo/client/pool.py [iso-8859-1] Tue Jan 25 14:59:01 2011
@@ -66,9 +66,7 @@ class ConnectionPool(object):
         p = Packets.RequestIdentification(NodeTypes.CLIENT,
             app.uuid, None, app.name)
         try:
-            msg_id = conn.ask(p, queue=app.local_var.queue)
-            app._waitMessage(conn, msg_id,
-                handler=app.storage_bootstrap_handler)
+            app._ask(conn, p, handler=app.storage_bootstrap_handler)
         except ConnectionClosed:
             neo.lib.logging.error('Connection to %r failed', node)
             self.notifyFailure(node)

Modified: trunk/neo/tests/client/testClientApp.py
==============================================================================
--- trunk/neo/tests/client/testClientApp.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testClientApp.py [iso-8859-1] Tue Jan 25 14:59:01 2011
@@ -15,7 +15,6 @@
 # along with this program; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 
-import new
 import unittest
 from cPickle import dumps
 from mock import Mock, ReturnValues
@@ -24,7 +23,8 @@ from neo.tests import NeoUnitTestBase
 from neo.client.app import Application, RevisionIndex
 from neo.client.exception import NEOStorageError, NEOStorageNotFoundError
 from neo.client.exception import NEOStorageDoesNotExistError
-from neo.lib.protocol import Packet, Packets, Errors, INVALID_TID
+from neo.lib.protocol import Packet, Packets, Errors, INVALID_TID, \
+    INVALID_PARTITION
 from neo.lib.util import makeChecksum
 import time
 
@@ -45,11 +45,14 @@ def getPartitionTable(self):
         self.master_conn = _getMasterConnection(self)
     return self.pt
 
-def _waitMessage(self, conn, msg_id, handler=None):
+def _ask(self, conn, packet, handler=None):
+    conn.ask(packet)
+    self.setHandlerData(None)
     if handler is None:
         raise NotImplementedError
     else:
         handler.dispatch(conn, conn.fakeReceived())
+    return self.getHandlerData()
 
 def resolving_tryToResolveConflict(oid, conflict_serial, serial, data):
     return data
@@ -63,10 +66,10 @@ class ClientApplicationTests(NeoUnitTest
         NeoUnitTestBase.setUp(self)
         # apply monkey patches
         self._getMasterConnection = Application._getMasterConnection
-        self._waitMessage = Application._waitMessage
+        self._ask = Application._ask
         self.getPartitionTable = Application.getPartitionTable
         Application._getMasterConnection = _getMasterConnection
-        Application._waitMessage = _waitMessage
+        Application._ask = _ask
         Application.getPartitionTable = getPartitionTable
         self._to_stop_list = []
 
@@ -76,12 +79,19 @@ class ClientApplicationTests(NeoUnitTest
             app.close()
         # restore environnement
         Application._getMasterConnection = self._getMasterConnection
-        Application._waitMessage = self._waitMessage
+        Application._ask = self._ask
         Application.getPartitionTable = self.getPartitionTable
         NeoUnitTestBase.tearDown(self)
 
     # some helpers
 
+    def _begin(self, app, txn, tid=None):
+        txn_context = app._txn_container.new(txn)
+        if tid is None:
+            tid = self.makeTID()
+        txn_context['ttid'] = tid
+        return txn_context
+
     def checkAskPacket(self, conn, packet_type, decode=False):
         calls = conn.mockGetNamedCalls('ask')
         self.assertEquals(len(calls), 1)
@@ -154,13 +164,6 @@ class ClientApplicationTests(NeoUnitTest
         #self.assertEquals(calls[0].getParam(0), conn)
         #self.assertTrue(isinstance(calls[0].getParam(2), Queue))
 
-    def test_getQueue(self):
-        app = self.getApp()
-        # Test sanity check
-        self.assertTrue(getattr(app, 'local_var', None) is not None)
-        # Test that queue is created
-        self.assertTrue(getattr(app.local_var, 'queue', None) is not None)
-
     def test_registerDB(self):
         app = self.getApp()
         dummy_db = []
@@ -186,7 +189,6 @@ class ClientApplicationTests(NeoUnitTest
 
     def test_load(self):
         app = self.getApp()
-        app.local_var.barrier_done = True
         mq = app.mq_cache
         oid = self.makeOID()
         tid1 = self.makeTID(1)
@@ -235,7 +237,6 @@ class ClientApplicationTests(NeoUnitTest
             'fakeReceived': packet,
         })
         app.cp = self.getConnectionPool([(Mock(), conn)])
-        app.local_var.asked_object = an_object[:-1]
         answer_barrier = Packets.AnswerBarrier()
         answer_barrier.setId(1)
         app.master_conn = Mock({
@@ -257,7 +258,6 @@ class ClientApplicationTests(NeoUnitTest
 
     def test_loadSerial(self):
         app = self.getApp()
-        app.local_var.barrier_done = True
         mq = app.mq_cache
         oid = self.makeOID()
         tid1 = self.makeTID(1)
@@ -292,7 +292,6 @@ class ClientApplicationTests(NeoUnitTest
             'fakeReceived': packet,
         })
         app.cp = self.getConnectionPool([(Mock(), conn)])
-        app.local_var.asked_object = another_object[:-1]
         result = loadSerial(oid, tid1)
         self.assertEquals(result, 'RIGHT')
         self.checkAskObject(conn)
@@ -300,7 +299,6 @@ class ClientApplicationTests(NeoUnitTest
 
     def test_loadBefore(self):
         app = self.getApp()
-        app.local_var.barrier_done = True
         mq = app.mq_cache
         oid = self.makeOID()
         tid1 = self.makeTID(1)
@@ -332,7 +330,6 @@ class ClientApplicationTests(NeoUnitTest
             'fakeReceived': packet,
         })
         app.cp = self.getConnectionPool([(Mock(), conn)])
-        app.local_var.asked_object = an_object[:-1]
         self.assertRaises(NEOStorageError, loadBefore, oid, tid1)
         # object should not have been cached
         self.assertFalse((oid, tid1) in mq)
@@ -349,7 +346,6 @@ class ClientApplicationTests(NeoUnitTest
             'fakeReceived': packet,
         })
         app.cp = self.getConnectionPool([(Mock(), conn)])
-        app.local_var.asked_object = another_object
         result = loadBefore(oid, tid3)
         self.assertEquals(result, ('RIGHT', tid2, tid3))
         self.checkAskObject(conn)
@@ -369,16 +365,17 @@ class ClientApplicationTests(NeoUnitTest
             'fakeReceived': packet,
         })
         app.tpc_begin(transaction=txn, tid=tid)
-        self.assertTrue(app.local_var.txn is txn)
-        self.assertEquals(app.local_var.tid, tid)
+        txn_context = app._txn_container.get(txn)
+        self.assertTrue(txn_context['txn'] is txn)
+        self.assertEquals(txn_context['ttid'], tid)
         # next, the transaction already begin -> raise
         self.assertRaises(StorageTransactionError, app.tpc_begin,
             transaction=txn, tid=None)
-        self.assertTrue(app.local_var.txn is txn)
-        self.assertEquals(app.local_var.tid, tid)
-        # cancel and start a transaction without tid
-        app.local_var.txn = None
-        app.local_var.tid = None
+        txn_context = app._txn_container.get(txn)
+        self.assertTrue(txn_context['txn'] is txn)
+        self.assertEquals(txn_context['ttid'], tid)
+        # start a transaction without tid
+        txn = Mock()
         # no connection -> NEOStorageError (wait until connected to primary)
         #self.assertRaises(NEOStorageError, app.tpc_begin, transaction=txn, tid=None)
         # ask a tid to pmn
@@ -392,8 +389,9 @@ class ClientApplicationTests(NeoUnitTest
         self.checkAskNewTid(app.master_conn)
         self.checkDispatcherRegisterCalled(app, app.master_conn)
         # check attributes
-        self.assertTrue(app.local_var.txn is txn)
-        self.assertEquals(app.local_var.tid, tid)
+        txn_context = app._txn_container.get(txn)
+        self.assertTrue(txn_context['txn'] is txn)
+        self.assertEquals(txn_context['ttid'], tid)
 
     def test_store1(self):
         app = self.getApp()
@@ -401,14 +399,10 @@ class ClientApplicationTests(NeoUnitTest
         tid = self.makeTID()
         txn = self.makeTransactionObject()
         # invalid transaction > StorageTransactionError
-        app.local_var.txn = old_txn = object()
-        self.assertTrue(app.local_var.txn is not txn)
         self.assertRaises(StorageTransactionError, app.store, oid, tid, '',
             None, txn)
-        self.assertEquals(app.local_var.txn, old_txn)
         # check partition_id and an empty cell list -> NEOStorageError
-        app.local_var.txn = txn
-        app.local_var.tid = tid
+        self._begin(app, txn, self.makeTID())
         app.pt = Mock({ 'getCellListForOID': (), })
         app.num_partitions = 2
         self.assertRaises(NEOStorageError, app.store, oid, tid, '',  None,
@@ -423,8 +417,7 @@ class ClientApplicationTests(NeoUnitTest
         tid = self.makeTID()
         txn = self.makeTransactionObject()
         # build conflicting state
-        app.local_var.txn = txn
-        app.local_var.tid = tid
+        txn_context = self._begin(app, txn, tid)
         packet = Packets.AnswerStoreObject(conflicting=1, oid=oid, serial=tid)
         packet.setId(0)
         storage_address = ('127.0.0.1', 10020)
@@ -436,14 +429,15 @@ class ClientApplicationTests(NeoUnitTest
                 return not queue.empty()
         app.dispatcher = Dispatcher()
         app.nm.createStorage(address=storage_address)
-        app.local_var.data_dict[oid] = 'BEFORE'
-        app.local_var.data_list.append(oid)
+        data_dict = txn_context['data_dict']
+        data_dict[oid] = 'BEFORE'
+        txn_context['data_list'].append(oid)
         app.store(oid, tid, '', None, txn)
-        app.local_var.queue.put((conn, packet))
-        self.assertRaises(ConflictError, app.waitStoreResponses,
+        txn_context['queue'].put((conn, packet))
+        self.assertRaises(ConflictError, app.waitStoreResponses, txn_context,
             failing_tryToResolveConflict)
-        self.assertTrue(oid not in app.local_var.data_dict)
-        self.assertEquals(app.local_var.object_stored_counter_dict[oid], {})
+        self.assertTrue(oid not in data_dict)
+        self.assertEquals(txn_context['object_stored_counter_dict'][oid], {})
         self.checkAskStoreObject(conn)
 
     def test_store3(self):
@@ -452,8 +446,7 @@ class ClientApplicationTests(NeoUnitTest
         tid = self.makeTID()
         txn = self.makeTransactionObject()
         # case with no conflict
-        app.local_var.txn = txn
-        app.local_var.tid = tid
+        txn_context = self._begin(app, txn, tid)
         packet = Packets.AnswerStoreObject(conflicting=0, oid=oid, serial=tid)
         packet.setId(0)
         storage_address = ('127.0.0.1', 10020)
@@ -467,47 +460,25 @@ class ClientApplicationTests(NeoUnitTest
         app.nm.createStorage(address=storage_address)
         app.store(oid, tid, 'DATA', None, txn)
         self.checkAskStoreObject(conn)
-        app.local_var.queue.put((conn, packet))
-        app.waitStoreResponses(resolving_tryToResolveConflict)
-        self.assertEquals(app.local_var.object_stored_counter_dict[oid], {tid: 1})
-        self.assertEquals(app.local_var.data_dict.get(oid, None), 'DATA')
-        self.assertFalse(oid in app.local_var.conflict_serial_dict)
+        txn_context['queue'].put((conn, packet))
+        app.waitStoreResponses(txn_context, resolving_tryToResolveConflict)
+        self.assertEquals(txn_context['object_stored_counter_dict'][oid],
+            {tid: 1})
+        self.assertEquals(txn_context['data_dict'].get(oid, None), 'DATA')
+        self.assertFalse(oid in txn_context['conflict_serial_dict'])
 
     def test_tpc_vote1(self):
         app = self.getApp()
-        oid = self.makeOID(11)
         txn = self.makeTransactionObject()
         # invalid transaction > StorageTransactionError
-        app.local_var.txn = old_txn = object()
-        self.assertTrue(app.local_var.txn is not txn)
         self.assertRaises(StorageTransactionError, app.tpc_vote, txn,
             resolving_tryToResolveConflict)
-        self.assertEquals(app.local_var.txn, old_txn)
-
-    def test_tpc_vote2(self):
-        # fake transaction object
-        app = self.getApp()
-        app.local_var.txn = self.makeTransactionObject()
-        app.local_var.tid = self.makeTID()
-        # wrong answer -> failure
-        packet = Packets.AnswerStoreTransaction(INVALID_TID)
-        packet.setId(0)
-        conn = Mock({
-            'getNextId': 1,
-            'fakeReceived': packet,
-            'getAddress': ('127.0.0.1', 0),
-        })
-        app.cp = self.getConnectionPool([(Mock(), conn)])
-        self.assertRaises(NEOStorageError, app.tpc_vote, app.local_var.txn,
-            resolving_tryToResolveConflict)
-        self.checkAskPacket(conn, Packets.AskStoreTransaction)
 
     def test_tpc_vote3(self):
         app = self.getApp()
         tid = self.makeTID()
         txn = self.makeTransactionObject()
-        app.local_var.txn = txn
-        app.local_var.tid = tid
+        self._begin(app, txn, tid)
         # response -> OK
         packet = Packets.AnswerStoreTransaction(tid=tid)
         packet.setId(0)
@@ -529,10 +500,9 @@ class ClientApplicationTests(NeoUnitTest
         app = self.getApp()
         tid = self.makeTID()
         txn = self.makeTransactionObject()
-        app.local_var.txn = old_txn = object()
+        old_txn = object()
+        self._begin(app, old_txn, tid)
         app.master_conn = Mock()
-        app.local_var.tid = tid
-        self.assertFalse(app.local_var.txn is txn)
         conn = Mock()
         cell = Mock()
         app.pt = Mock({'getCellListForTID': (cell, cell)})
@@ -541,8 +511,9 @@ class ClientApplicationTests(NeoUnitTest
         # no packet sent
         self.checkNoPacketSent(conn)
         self.checkNoPacketSent(app.master_conn)
-        self.assertEquals(app.local_var.txn, old_txn)
-        self.assertEquals(app.local_var.tid, tid)
+        txn_context = app._txn_container.get(old_txn)
+        self.assertTrue(txn_context['txn'] is old_txn)
+        self.assertEquals(txn_context['ttid'], tid)
 
     def test_tpc_abort2(self):
         # 2 nodes : 1 transaction in the first, 2 objects in the second
@@ -552,7 +523,7 @@ class ClientApplicationTests(NeoUnitTest
         oid1, oid2 = self.makeOID(2), self.makeOID(4) # on partition 0
         app, tid = self.getApp(), self.makeTID(1)     # on partition 1
         txn = self.makeTransactionObject()
-        app.local_var.txn, app.local_var.tid = txn, tid
+        txn_context = self._begin(app, txn, tid)
         app.master_conn = Mock({'__hash__': 0})
         app.num_partitions = 2
         cell1 = Mock({ 'getNode': 'NODE1', '__hash__': 1 })
@@ -560,16 +531,13 @@ class ClientApplicationTests(NeoUnitTest
         conn1, conn2 = Mock({ 'getNextId': 1, }), Mock({ 'getNextId': 2, })
         app.cp = Mock({ 'getConnForNode': ReturnValues(conn1, conn2), })
         # fake data
-        app.local_var.data_dict = {oid1: '', oid2: ''}
-        app.local_var.involved_nodes = set([cell1, cell2])
+        txn_context['involved_nodes'].update([cell1, cell2])
         app.tpc_abort(txn)
         # will check if there was just one call/packet :
         self.checkNotifyPacket(conn1, Packets.AbortTransaction)
         self.checkNotifyPacket(conn2, Packets.AbortTransaction)
-        self.assertEquals(app.local_var.tid, None)
-        self.assertEquals(app.local_var.txn, None)
-        self.assertEquals(app.local_var.data_dict, {})
-        self.assertEquals(app.local_var.txn_voted, False)
+        self.checkNotifyPacket(app.master_conn, Packets.AbortTransaction)
+        self.assertEqual(app._txn_container.get(txn), None)
 
     def test_tpc_abort3(self):
         """ check that abort is sent to all nodes involved in the transaction """
@@ -617,19 +585,20 @@ class ClientApplicationTests(NeoUnitTest
         })
         app.master_conn = Mock({'__hash__': 0})
         txn = self.makeTransactionObject()
-        app.local_var.txn, app.local_var.tid = txn, tid
+        txn_context = self._begin(app, txn, tid)
         class Dispatcher(object):
             def pending(self, queue):
                 return not queue.empty()
 
-            def forget_queue(self, queue):
+            def forget_queue(self, queue, flush_queue=True):
                 pass
         app.dispatcher = Dispatcher()
         # conflict occurs on storage 2
         app.store(oid1, tid, 'DATA', None, txn)
         app.store(oid2, tid, 'DATA', None, txn)
-        app.local_var.queue.put((conn2, packet2))
-        app.local_var.queue.put((conn3, packet3))
+        queue = txn_context['queue']
+        queue.put((conn2, packet2))
+        queue.put((conn3, packet3))
         # vote fails as the conflict is not resolved, nothing is sent to storage 3
         self.assertRaises(ConflictError, app.tpc_vote, txn, failing_tryToResolveConflict)
         # abort must be sent to storage 1 and 2
@@ -640,59 +609,11 @@ class ClientApplicationTests(NeoUnitTest
     def test_tpc_finish1(self):
         # transaction mismatch: raise
         app = self.getApp()
-        tid = self.makeTID()
         txn = self.makeTransactionObject()
-        app.local_var.txn = old_txn = object()
         app.master_conn = Mock()
-        self.assertFalse(app.local_var.txn is txn)
-        conn = Mock()
         self.assertRaises(StorageTransactionError, app.tpc_finish, txn, None)
         # no packet sent
-        self.checkNoPacketSent(conn)
         self.checkNoPacketSent(app.master_conn)
-        self.assertEquals(app.local_var.txn, old_txn)
-
-    def test_tpc_finish2(self):
-        # bad answer -> NEOStorageError
-        app = self.getApp()
-        tid = self.makeTID()
-        txn = self.makeTransactionObject()
-        app.local_var.txn, app.local_var.tid = txn, tid
-        # test callable passed to tpc_finish
-        self.f_called = False
-        self.f_called_with_tid = None
-        def hook(tid):
-            self.f_called = True
-            self.f_called_with_tid = tid
-        packet = Packets.AnswerTransactionFinished(INVALID_TID, INVALID_TID)
-        packet.setId(0)
-        app.master_conn = Mock({
-            'getNextId': 1,
-            'getAddress': ('127.0.0.1', 10000),
-            'fakeReceived': packet,
-        })
-        self.vote_params = None
-        tpc_vote = app.tpc_vote
-        def voteDetector(transaction, tryToResolveConflict):
-            self.vote_params = (transaction, tryToResolveConflict)
-        dummy_tryToResolveConflict = []
-        app.tpc_vote = voteDetector
-        app.local_var.txn_voted = True
-        self.assertRaises(NEOStorageError, app.tpc_finish, txn,
-            dummy_tryToResolveConflict, hook)
-        self.assertFalse(self.f_called)
-        self.assertEqual(self.vote_params, None)
-        self.checkAskFinishTransaction(app.master_conn)
-        self.checkDispatcherRegisterCalled(app, app.master_conn)
-        # Call again, but this time transaction is not voted yet
-        app.local_var.txn_voted = False
-        self.f_called = False
-        self.assertRaises(NEOStorageError, app.tpc_finish, txn,
-            dummy_tryToResolveConflict, hook)
-        self.assertFalse(self.f_called)
-        self.assertTrue(self.vote_params[0] is txn)
-        self.assertTrue(self.vote_params[1] is dummy_tryToResolveConflict)
-        app.tpc_vote = tpc_vote
 
     def test_tpc_finish3(self):
         # transaction is finished
@@ -700,7 +621,7 @@ class ClientApplicationTests(NeoUnitTest
         tid = self.makeTID()
         ttid = self.makeTID()
         txn = self.makeTransactionObject()
-        app.local_var.txn, app.local_var.tid = txn, ttid
+        txn_context = self._begin(app, txn, tid)
         self.f_called = False
         self.f_called_with_tid = None
         def hook(tid):
@@ -713,17 +634,13 @@ class ClientApplicationTests(NeoUnitTest
             'getAddress': ('127.0.0.1', 10010),
             'fakeReceived': packet,
         })
-        app.local_var.txn_voted = True
-        app.local_var.txn_finished = True
+        txn_context['txn_voted'] = True
         app.tpc_finish(txn, None, hook)
         self.assertTrue(self.f_called)
         self.assertEquals(self.f_called_with_tid, tid)
         self.checkAskFinishTransaction(app.master_conn)
         #self.checkDispatcherRegisterCalled(app, app.master_conn)
-        self.assertEquals(app.local_var.tid, None)
-        self.assertEquals(app.local_var.txn, None)
-        self.assertEquals(app.local_var.data_dict, {})
-        self.assertEquals(app.local_var.txn_voted, False)
+        self.assertEqual(app._txn_container.get(txn), None)
 
     def test_undo1(self):
         # invalid transaction
@@ -731,21 +648,15 @@ class ClientApplicationTests(NeoUnitTest
         tid = self.makeTID()
         snapshot_tid = self.getNextTID()
         txn = self.makeTransactionObject()
-        marker = []
         def tryToResolveConflict(oid, conflict_serial, serial, data):
-            marker.append(1)
-        app.local_var.txn = old_txn = object()
+            pass
         app.master_conn = Mock()
-        self.assertFalse(app.local_var.txn is txn)
         conn = Mock()
         self.assertRaises(StorageTransactionError, app.undo, snapshot_tid, tid,
             txn, tryToResolveConflict)
         # no packet sent
         self.checkNoPacketSent(conn)
         self.checkNoPacketSent(app.master_conn)
-        # nothing done
-        self.assertEquals(marker, [])
-        self.assertEquals(app.local_var.txn, old_txn)
 
     def _getAppForUndoTests(self, oid0, tid0, tid1, tid2):
         app = self.getApp()
@@ -780,10 +691,10 @@ class ClientApplicationTests(NeoUnitTest
             return ({tid0: 'dummy', tid2: 'cdummy'}[serial], None, None)
         app.load = load
         store_marker = []
-        def _store(oid, serial, data, data_serial=None):
+        def _store(txn_context, oid, serial, data, data_serial=None,
+                unlock=False):
             store_marker.append((oid, serial, data, data_serial))
         app._store = _store
-        app.local_var.clear()
         return app, conn, store_marker
 
     def test_undoWithResolutionSuccess(self):
@@ -805,7 +716,7 @@ class ClientApplicationTests(NeoUnitTest
         undo_serial = Packets.AnswerObjectUndoSerial({
             oid0: (tid2, tid0, False)})
         undo_serial.setId(2)
-        app.local_var.queue.put((conn, undo_serial))
+        app._getThreadQueue().put((conn, undo_serial))
         marker = []
         def tryToResolveConflict(oid, conflict_serial, serial, data,
                 committedData=''):
@@ -846,7 +757,7 @@ class ClientApplicationTests(NeoUnitTest
         undo_serial.setId(2)
         app, conn, store_marker = self._getAppForUndoTests(oid0, tid0, tid1,
             tid2)
-        app.local_var.queue.put((conn, undo_serial))
+        app._getThreadQueue().put((conn, undo_serial))
         marker = []
         def tryToResolveConflict(oid, conflict_serial, serial, data,
                 committedData=''):
@@ -872,7 +783,7 @@ class ClientApplicationTests(NeoUnitTest
             marker.append((oid, conflict_serial, serial, data, committedData))
             raise ConflictError
         # The undo
-        app.local_var.queue.put((conn, undo_serial))
+        app._getThreadQueue().put((conn, undo_serial))
         self.assertRaises(UndoError, app.undo, snapshot_tid, tid1, txn,
             tryToResolveConflict)
         # Checking what happened
@@ -905,7 +816,7 @@ class ClientApplicationTests(NeoUnitTest
         undo_serial.setId(2)
         app, conn, store_marker = self._getAppForUndoTests(oid0, tid0, tid1,
             tid2)
-        app.local_var.queue.put((conn, undo_serial))
+        app._getThreadQueue().put((conn, undo_serial))
         def tryToResolveConflict(oid, conflict_serial, serial, data,
                 committedData=''):
             raise Exception, 'Test called conflict resolution, but there ' \
@@ -929,15 +840,19 @@ class ClientApplicationTests(NeoUnitTest
         cell1, cell2 = Mock({}), Mock({})
         tid1, tid2 = self.makeTID(1), self.makeTID(2)
         oid1, oid2 = self.makeOID(1), self.makeOID(2)
-        # TIDs packets supplied by _waitMessage hook
+        # TIDs packets supplied by _ask hook
         # TXN info packets
         extension = dumps({})
+        p1 = Packets.AnswerTIDs([tid1])
+        p2 = Packets.AnswerTIDs([tid2])
         p3 = Packets.AnswerTransactionInformation(tid1, '', '',
                 extension, False, (oid1, ))
         p4 = Packets.AnswerTransactionInformation(tid2, '', '',
                 extension, False, (oid2, ))
-        p3.setId(0)
-        p4.setId(1)
+        p1.setId(0)
+        p2.setId(1)
+        p3.setId(2)
+        p4.setId(3)
         conn = Mock({
             'getNextId': 1,
             'getUUID': ReturnValues(uuid1, uuid2),
@@ -945,17 +860,36 @@ class ClientApplicationTests(NeoUnitTest
             'fakeReceived': ReturnValues(p3, p4),
             'getAddress': ('127.0.0.1', 10010),
         })
+        storage_1_conn = Mock()
+        storage_2_conn = Mock()
         app.pt = Mock({
             'getNodeList': (node1, node2, ),
             'getCellListForTID': ReturnValues([cell1], [cell2]),
         })
-        app.cp = self.getConnectionPool([(Mock(), conn)])
-        def waitResponses(self):
-            self.local_var.node_tids = {uuid1: (tid1, ), uuid2: (tid2, )}
-        app.waitResponses = new.instancemethod(waitResponses, app, Application)
+        app.cp = Mock({
+            'getConnForNode': ReturnValues(storage_1_conn, storage_2_conn),
+            'iterateForObject': [(Mock(), conn)]
+        })
+        def waitResponses(queue, handler_data):
+            app.setHandlerData(handler_data)
+            for p in (p1, p2):
+                app._handlePacket(Mock(), p, handler=app.storage_handler)
+        app.waitResponses = waitResponses
         def txn_filter(info):
             return info['id'] > '\x00' * 8
-        result = app.undoLog(0, 4, filter=txn_filter)
+        first = 0
+        last = 4
+        result = app.undoLog(first, last, filter=txn_filter)
+        pfirst, plast, ppartition = self.checkAskPacket(storage_1_conn,
+            Packets.AskTIDs, decode=True)
+        self.assertEqual(pfirst, first)
+        self.assertEqual(plast, last)
+        self.assertEqual(ppartition, INVALID_PARTITION)
+        pfirst, plast, ppartition = self.checkAskPacket(storage_2_conn,
+            Packets.AskTIDs, decode=True)
+        self.assertEqual(pfirst, first)
+        self.assertEqual(plast, last)
+        self.assertEqual(ppartition, INVALID_PARTITION)
         self.assertEquals(result[0]['id'], tid1)
         self.assertEquals(result[1]['id'], tid2)
 
@@ -968,9 +902,9 @@ class ClientApplicationTests(NeoUnitTest
         p2 = Packets.AnswerObjectHistory(oid, object_history)
         extension = dumps({'k': 'v'})
         # transaction history
-        p3 = Packets.AnswerTransactionInformation(tid1, 'u', 'd',
+        p3 = Packets.AnswerTransactionInformation(tid2, 'u', 'd',
                 extension, False, (oid, ))
-        p4 = Packets.AnswerTransactionInformation(tid2, 'u', 'd',
+        p4 = Packets.AnswerTransactionInformation(tid1, 'u', 'd',
                 extension, False, (oid, ))
         p2.setId(0)
         p3.setId(1)
@@ -979,7 +913,7 @@ class ClientApplicationTests(NeoUnitTest
         conn = Mock({
             'getNextId': 1,
             'fakeGetApp': app,
-            'fakeReceived': ReturnValues(p2, p3, p4),
+            'fakeReceived': ReturnValues(p3, p4),
             'getAddress': ('127.0.0.1', 10010),
         })
         object_cells = [ Mock({}), ]
@@ -988,12 +922,18 @@ class ClientApplicationTests(NeoUnitTest
             'getCellListForOID': object_cells,
             'getCellListForTID': ReturnValues(history_cells, history_cells),
         })
-        app.cp = self.getConnectionPool([(Mock(), conn)])
+        app.cp = Mock({
+            'iterateForObject': [(Mock(), conn)],
+        })
+        def waitResponses(queue, handler_data):
+            app.setHandlerData(handler_data)
+            app._handlePacket(Mock(), p2, handler=app.storage_handler)
+        app.waitResponses = waitResponses
         # start test here
         result = app.history(oid)
         self.assertEquals(len(result), 2)
-        self.assertEquals(result[0]['tid'], tid1)
-        self.assertEquals(result[1]['tid'], tid2)
+        self.assertEquals(result[0]['tid'], tid2)
+        self.assertEquals(result[1]['tid'], tid1)
         self.assertEquals(result[0]['size'], 42)
         self.assertEquals(result[1]['size'], 42)
 
@@ -1010,43 +950,41 @@ class ClientApplicationTests(NeoUnitTest
         # TODO: test more connection failure cases
         # Seventh packet : askNodeInformation succeeded
         all_passed = []
-        def _waitMessage8(conn, msg_id, handler=None):
+        def _ask8(_):
             all_passed.append(1)
         # Sixth packet : askPartitionTable succeeded
-        def _waitMessage7(conn, msg_id, handler=None):
+        def _ask7(_):
             app.pt = Mock({'operational': True})
-            app._waitMessage = _waitMessage8
         # fifth packet : request node identification succeeded
-        def _waitMessage6(conn, msg_id, handler=None):
+        def _ask6(conn):
             conn.setUUID('D' * 16)
             app.uuid = 'C' * 16
-            app._waitMessage = _waitMessage7
         # fourth iteration : connection to primary master succeeded
-        def _waitMessage5(conn, msg_id, handler=None):
+        def _ask5(_):
             app.trying_master_node = app.primary_master_node = Mock({
                 'getAddress': ('192.168.1.1', 10000),
                 '__str__': 'Fake master node',
             })
-            app._waitMessage = _waitMessage6
         # third iteration : node not ready
-        def _waitMessage4(conn, msg_id, handler=None):
+        def _ask4(_):
             app.trying_master_node = None
-            app._waitMessage = _waitMessage5
         # second iteration : master node changed
-        def _waitMessage3(conn, msg_id, handler=None):
+        def _ask3(_):
             app.primary_master_node = Mock({
                 'getAddress': ('192.168.1.1', 10000),
                 '__str__': 'Fake master node',
             })
-            app._waitMessage = _waitMessage4
         # first iteration : connection failed
-        def _waitMessage2(conn, msg_id, handler=None):
+        def _ask2(_):
             app.trying_master_node = None
-            app._waitMessage = _waitMessage3
         # do nothing for the first call
-        def _waitMessage1(conn, msg_id, handler=None):
-            app._waitMessage = _waitMessage2
-        app._waitMessage = _waitMessage1
+        def _ask1(_):
+            pass
+        ask_func_list = [_ask1, _ask2, _ask3, _ask4, _ask5, _ask6, _ask7,
+            _ask8]
+        def _ask_base(conn, _, handler=None):
+            ask_func_list.pop(0)(conn)
+        app._ask = _ask_base
         # faked environnement
         app.connector_handler = DoNothingConnector
         app.em = Mock({'getConnectionList': []})
@@ -1056,23 +994,6 @@ class ClientApplicationTests(NeoUnitTest
         self.assertTrue(app.master_conn is not None)
         self.assertTrue(app.pt.operational())
 
-    def test_askStorage(self):
-        """ _askStorage is private but test it anyway """
-        app = self.getApp('')
-        conn = Mock()
-        self.test_ok = False
-        def _waitMessage_hook(app, conn, msg_id, handler=None):
-            self.test_ok = True
-        packet = Packets.AskBeginTransaction()
-        packet.setId(0)
-        app._waitMessage = _waitMessage_hook
-        app._askStorage(conn, packet)
-        # check packet sent, connection unlocked and dispatcher updated
-        self.checkAskNewTid(conn)
-        self.checkDispatcherRegisterCalled(app, conn)
-        # and _waitMessage called
-        self.assertTrue(self.test_ok)
-
     def test_askPrimary(self):
         """ _askPrimary is private but test it anyway """
         app = self.getApp('')
@@ -1080,21 +1001,22 @@ class ClientApplicationTests(NeoUnitTest
         app.master_conn = conn
         app.primary_handler = Mock()
         self.test_ok = False
-        def _waitMessage_hook(app, conn, msg_id, handler=None):
+        def _ask_hook(app, conn, packet, handler=None):
+            conn.ask(packet)
             self.assertTrue(handler is app.primary_handler)
             self.test_ok = True
-        _waitMessage_old = Application._waitMessage
-        Application._waitMessage = _waitMessage_hook
+        _ask_old = Application._ask
+        Application._ask = _ask_hook
         packet = Packets.AskBeginTransaction()
         packet.setId(0)
         try:
             app._askPrimary(packet)
         finally:
-            Application._waitMessage = _waitMessage_old
+            Application._ask = _ask_old
         # check packet sent, connection locked during process and dispatcher updated
         self.checkAskNewTid(conn)
         self.checkDispatcherRegisterCalled(app, conn)
-        # and _waitMessage called
+        # and _ask called
         self.assertTrue(self.test_ok)
         # check NEOStorageError is raised when the primary connection is lost
         app.master_conn = None
@@ -1105,15 +1027,16 @@ class ClientApplicationTests(NeoUnitTest
         """ Thread context properties must not be visible accross instances
             while remaining in the same thread """
         app1 = self.getApp()
-        app1_local = app1.local_var
+        app1_local = app1._thread_container.get()
         app2 = self.getApp()
-        app2_local = app2.local_var
+        app2_local = app2._thread_container.get()
         property_id = 'thread_context_test'
-        self.assertFalse(hasattr(app1_local, property_id))
-        self.assertFalse(hasattr(app2_local, property_id))
-        setattr(app1_local, property_id, 'value')
-        self.assertTrue(hasattr(app1_local, property_id))
-        self.assertFalse(hasattr(app2_local, property_id))
+        value = 'value'
+        self.assertRaises(KeyError, app1_local.__getitem__, property_id)
+        self.assertRaises(KeyError, app2_local.__getitem__, property_id)
+        app1_local[property_id] = value
+        self.assertEqual(app1_local[property_id], value)
+        self.assertRaises(KeyError, app2_local.__getitem__, property_id)
 
     def test_pack(self):
         app = self.getApp()

Modified: trunk/neo/tests/client/testMasterHandler.py
==============================================================================
--- trunk/neo/tests/client/testMasterHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testMasterHandler.py [iso-8859-1] Tue Jan 25 14:59:01 2011
@@ -235,7 +235,7 @@ class MasterAnswersHandlerTests(MasterHa
         tid = self.getNextTID()
         conn = self.getConnection()
         self.handler.answerBeginTransaction(conn, tid)
-        calls = self.app.mockGetNamedCalls('setTID')
+        calls = self.app.mockGetNamedCalls('setHandlerData')
         self.assertEqual(len(calls), 1)
         calls[0].checkArgs(tid)
 
@@ -247,18 +247,12 @@ class MasterAnswersHandlerTests(MasterHa
 
     def test_answerTransactionFinished(self):
         conn = self.getConnection()
-        ttid1 = self.getNextTID()
-        ttid2 = self.getNextTID(ttid1)
-        tid2 = self.getNextTID(ttid2)
-        # wrong TID
-        self.app = Mock({'getTID': ttid1})
-        self.assertRaises(NEOStorageError,
-            self.handler.answerTransactionFinished,
-            conn, ttid2, tid2)
-        # matching TID
-        app = Mock({'getTID': ttid2})
-        handler = PrimaryAnswersHandler(app=app)
-        handler.answerTransactionFinished(conn, ttid2, tid2)
+        ttid2 = self.getNextTID()
+        tid2 = self.getNextTID()
+        self.handler.answerTransactionFinished(conn, ttid2, tid2)
+        calls = self.app.mockGetNamedCalls('setHandlerData')
+        self.assertEqual(len(calls), 1)
+        calls[0].checkArgs(tid2)
         
     def test_answerPack(self):
         self.assertRaises(NEOStorageError, self.handler.answerPack, None, False)

Modified: trunk/neo/tests/client/testStorageHandler.py
==============================================================================
--- trunk/neo/tests/client/testStorageHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testStorageHandler.py [iso-8859-1] Tue Jan 25 14:59:01 2011
@@ -25,6 +25,7 @@ from neo.client.exception import NEOStor
 from neo.client.exception import NEOStorageDoesNotExistError
 from ZODB.POSException import ConflictError
 from neo.lib.exception import NodeNotReady
+from ZODB.TimeStamp import TimeStamp
 
 MARKER = []
 
@@ -69,45 +70,57 @@ class StorageAnswerHandlerTests(NeoUnitT
     def setUp(self):
         NeoUnitTestBase.setUp(self)
         self.app = Mock()
-        self.app.local_var = Mock()
         self.handler = StorageAnswersHandler(self.app)
 
     def getConnection(self):
         return self.getFakeConnection()
 
+    def _checkHandlerData(self, ref):
+        calls = self.app.mockGetNamedCalls('setHandlerData')
+        self.assertEqual(len(calls), 1)
+        calls[0].checkArgs(ref)
+
     def test_answerObject(self):
         conn = self.getConnection()
         oid = self.getOID(0)
         tid1 = self.getNextTID()
         tid2 = self.getNextTID(tid1)
         the_object = (oid, tid1, tid2, 0, '', 'DATA', None)
-        self.app.local_var.asked_object = None
         self.handler.answerObject(conn, *the_object)
-        self.assertEqual(self.app.local_var.asked_object, the_object[:-1])
+        self._checkHandlerData(the_object[:-1])
         # Check handler raises on non-None data_serial.
         the_object = (oid, tid1, tid2, 0, '', 'DATA', self.getNextTID())
-        self.app.local_var.asked_object = None
         self.assertRaises(NEOStorageError, self.handler.answerObject, conn,
             *the_object)
 
+    def _getAnswerStoreObjectHandler(self, object_stored_counter_dict,
+            conflict_serial_dict, resolved_conflict_serial_dict):
+        app = Mock({
+            'getHandlerData': {
+                'object_stored_counter_dict': object_stored_counter_dict,
+                'conflict_serial_dict': conflict_serial_dict,
+                'resolved_conflict_serial_dict': resolved_conflict_serial_dict,
+            }
+        })
+        return StorageAnswersHandler(app)
+
     def test_answerStoreObject_1(self):
         conn = self.getConnection()
         oid = self.getOID(0)
         tid = self.getNextTID()
         # conflict
-        local_var = self.app.local_var
-        local_var.object_stored_counter_dict = {oid: {}}
-        local_var.conflict_serial_dict = {}
-        local_var.resolved_conflict_serial_dict = {}
-        self.handler.answerStoreObject(conn, 1, oid, tid)
-        self.assertEqual(local_var.conflict_serial_dict[oid], set([tid, ]))
-        self.assertEqual(local_var.object_stored_counter_dict[oid], {})
-        self.assertFalse(oid in local_var.resolved_conflict_serial_dict)
+        object_stored_counter_dict = {oid: {}}
+        conflict_serial_dict = {}
+        resolved_conflict_serial_dict = {}
+        self._getAnswerStoreObjectHandler(object_stored_counter_dict,
+            conflict_serial_dict, resolved_conflict_serial_dict,
+            ).answerStoreObject(conn, 1, oid, tid)
+        self.assertEqual(conflict_serial_dict[oid], set([tid, ]))
+        self.assertEqual(object_stored_counter_dict[oid], {})
+        self.assertFalse(oid in resolved_conflict_serial_dict)
         # object was already accepted by another storage, raise
-        local_var.object_stored_counter_dict = {oid: {tid: 1}}
-        local_var.conflict_serial_dict = {}
-        local_var.resolved_conflict_serial_dict = {}
-        self.assertRaises(NEOStorageError, self.handler.answerStoreObject,
+        handler = self._getAnswerStoreObjectHandler({oid: {tid: 1}}, {}, {})
+        self.assertRaises(NEOStorageError, handler.answerStoreObject,
             conn, 1, oid, tid)
 
     def test_answerStoreObject_2(self):
@@ -116,25 +129,23 @@ class StorageAnswerHandlerTests(NeoUnitT
         tid = self.getNextTID()
         tid_2 = self.getNextTID()
         # resolution-pending conflict
-        local_var = self.app.local_var
-        local_var.object_stored_counter_dict = {oid: {}}
-        local_var.conflict_serial_dict = {oid: set([tid, ])}
-        local_var.resolved_conflict_serial_dict = {}
-        self.handler.answerStoreObject(conn, 1, oid, tid)
-        self.assertEqual(local_var.conflict_serial_dict[oid], set([tid, ]))
-        self.assertFalse(oid in local_var.resolved_conflict_serial_dict)
-        self.assertEqual(local_var.object_stored_counter_dict[oid], {})
+        object_stored_counter_dict = {oid: {}}
+        conflict_serial_dict = {oid: set([tid, ])}
+        resolved_conflict_serial_dict = {}
+        self._getAnswerStoreObjectHandler(object_stored_counter_dict,
+            conflict_serial_dict, resolved_conflict_serial_dict,
+            ).answerStoreObject(conn, 1, oid, tid)
+        self.assertEqual(conflict_serial_dict[oid], set([tid, ]))
+        self.assertFalse(oid in resolved_conflict_serial_dict)
+        self.assertEqual(object_stored_counter_dict[oid], {})
         # object was already accepted by another storage, raise
-        local_var.object_stored_counter_dict = {oid: {tid: 1}}
-        local_var.conflict_serial_dict = {oid: set([tid, ])}
-        local_var.resolved_conflict_serial_dict = {}
-        self.assertRaises(NEOStorageError, self.handler.answerStoreObject,
+        handler = self._getAnswerStoreObjectHandler({oid: {tid: 1}},
+            {oid: set([tid, ])}, {})
+        self.assertRaises(NEOStorageError, handler.answerStoreObject,
             conn, 1, oid, tid)
         # detected conflict is different, don't raise
-        local_var.object_stored_counter_dict = {oid: {}}
-        local_var.conflict_serial_dict = {oid: set([tid, ])}
-        local_var.resolved_conflict_serial_dict = {}
-        self.handler.answerStoreObject(conn, 1, oid, tid_2)
+        self._getAnswerStoreObjectHandler({oid: {}}, {oid: set([tid, ])}, {},
+            ).answerStoreObject(conn, 1, oid, tid_2)
 
     def test_answerStoreObject_3(self):
         conn = self.getConnection()
@@ -145,49 +156,34 @@ class StorageAnswerHandlerTests(NeoUnitT
         # This case happens if a storage is answering a store action for which
         # any other storage already answered (with same conflict) and any other
         # storage accepted the resolved object.
-        local_var = self.app.local_var
-        local_var.object_stored_counter_dict = {oid: {tid_2: 1}}
-        local_var.conflict_serial_dict = {}
-        local_var.resolved_conflict_serial_dict = {oid: set([tid, ])}
-        self.handler.answerStoreObject(conn, 1, oid, tid)
-        self.assertFalse(oid in local_var.conflict_serial_dict)
-        self.assertEqual(local_var.resolved_conflict_serial_dict[oid],
+        object_stored_counter_dict = {oid: {tid_2: 1}}
+        conflict_serial_dict = {}
+        resolved_conflict_serial_dict = {oid: set([tid, ])}
+        self._getAnswerStoreObjectHandler(object_stored_counter_dict,
+            conflict_serial_dict, resolved_conflict_serial_dict,
+            ).answerStoreObject(conn, 1, oid, tid)
+        self.assertFalse(oid in conflict_serial_dict)
+        self.assertEqual(resolved_conflict_serial_dict[oid],
             set([tid, ]))
-        self.assertEqual(local_var.object_stored_counter_dict[oid], {tid_2: 1})
+        self.assertEqual(object_stored_counter_dict[oid], {tid_2: 1})
         # detected conflict is different, don't raise
-        local_var.object_stored_counter_dict = {oid: {tid: 1}}
-        local_var.conflict_serial_dict = {}
-        local_var.resolved_conflict_serial_dict = {oid: set([tid, ])}
-        self.handler.answerStoreObject(conn, 1, oid, tid_2)
+        self._getAnswerStoreObjectHandler({oid: {tid: 1}}, {},
+            {oid: set([tid, ])}).answerStoreObject(conn, 1, oid, tid_2)
 
     def test_answerStoreObject_4(self):
         conn = self.getConnection()
         oid = self.getOID(0)
         tid = self.getNextTID()
         # no conflict
-        local_var = self.app.local_var
-        local_var.object_stored_counter_dict = {oid: {}}
-        local_var.conflict_serial_dict = {}
-        local_var.resolved_conflict_serial_dict = {}
-        self.handler.answerStoreObject(conn, 0, oid, tid)
-        self.assertFalse(oid in local_var.conflict_serial_dict)
-        self.assertFalse(oid in local_var.resolved_conflict_serial_dict)
-        self.assertEqual(local_var.object_stored_counter_dict[oid], {tid: 1})
-
-    def test_answerStoreTransaction(self):
-        conn = self.getConnection()
-        tid1 = self.getNextTID()
-        tid2 = self.getNextTID(tid1)
-        # wrong tid
-        app = Mock({'getTID': tid1})
-        handler = StorageAnswersHandler(app=app)
-        self.assertRaises(NEOStorageError,
-            handler.answerStoreTransaction, conn,
-            tid2)
-        # good tid
-        app = Mock({'getTID': tid2})
-        handler = StorageAnswersHandler(app=app)
-        handler.answerStoreTransaction(conn, tid2)
+        object_stored_counter_dict = {oid: {}}
+        conflict_serial_dict = {}
+        resolved_conflict_serial_dict = {}
+        self._getAnswerStoreObjectHandler(object_stored_counter_dict,
+            conflict_serial_dict, resolved_conflict_serial_dict,
+            ).answerStoreObject(conn, 0, oid, tid)
+        self.assertFalse(oid in conflict_serial_dict)
+        self.assertFalse(oid in resolved_conflict_serial_dict)
+        self.assertEqual(object_stored_counter_dict[oid], {tid: 1})
 
     def test_answerTransactionInformation(self):
         conn = self.getConnection()
@@ -195,24 +191,30 @@ class StorageAnswerHandlerTests(NeoUnitT
         user = 'USER'
         desc = 'DESC'
         ext = 'EXT'
+        packed = False
         oid_list = [self.getOID(0), self.getOID(1)]
-        self.app.local_var.txn_info = None
         self.handler.answerTransactionInformation(conn, tid, user, desc, ext,
-            False, oid_list)
-        txn_info = self.app.local_var.txn_info
-        self.assertTrue(isinstance(txn_info, dict))
-        self.assertEqual(txn_info['user_name'], user)
-        self.assertEqual(txn_info['description'], desc)
-        self.assertEqual(txn_info['id'], tid)
-        self.assertEqual(txn_info['oids'], oid_list)
+            packed, oid_list)
+        self._checkHandlerData(({
+            'time': TimeStamp(tid).timeTime(),
+            'user_name': user,
+            'description': desc,
+            'id': tid,
+            'oids': oid_list,
+            'packed': packed,
+        }, ext))
 
     def test_answerObjectHistory(self):
         conn = self.getConnection()
         oid = self.getOID(0)
-        history_list = []
-        self.app.local_var.history = None
-        self.handler.answerObjectHistory(conn, oid, history_list)
-        self.assertEqual(self.app.local_var.history, (oid, history_list))
+        history_list = [self.getNextTID(), self.getNextTID()]
+        history_set = set()
+        app = Mock({
+            'getHandlerData': history_set,
+        })
+        handler = StorageAnswersHandler(app)
+        handler.answerObjectHistory(conn, oid, history_list)
+        self.assertEqual(history_set, set(history_list))
 
     def test_oidNotFound(self):
         conn = self.getConnection()
@@ -235,10 +237,14 @@ class StorageAnswerHandlerTests(NeoUnitT
         tid2 = self.getNextTID(tid1)
         tid_list = [tid1, tid2]
         conn = self.getFakeConnection(uuid=uuid)
-        self.app.local_var.node_tids = {}
-        self.handler.answerTIDs(conn, tid_list)
-        self.assertTrue(uuid in self.app.local_var.node_tids)
-        self.assertEqual(self.app.local_var.node_tids[uuid], tid_list)
+        tid_set = set()
+        app = Mock({
+            'getHandlerData': tid_set,
+        })
+        handler = StorageAnswersHandler(app)
+
+        handler.answerTIDs(conn, tid_list)
+        self.assertEqual(tid_set, set(tid_list))
 
     def test_answerObjectUndoSerial(self):
         uuid = self.getNewUUID()
@@ -249,12 +255,14 @@ class StorageAnswerHandlerTests(NeoUnitT
         tid1 = self.getNextTID()
         tid2 = self.getNextTID()
         tid3 = self.getNextTID()
-        self.app.local_var.undo_object_tid_dict = undo_dict = {
-            oid1: [tid0, tid1],
-        }
-        self.handler.answerObjectUndoSerial(conn, {
-            oid2: [tid2, tid3],
+        undo_dict = {}
+        app = Mock({
+            'getHandlerData': undo_dict,
         })
+        handler = StorageAnswersHandler(app)
+        handler.answerObjectUndoSerial(conn, {oid1: [tid0, tid1]})
+        self.assertEqual(undo_dict, {oid1: [tid0, tid1]})
+        handler.answerObjectUndoSerial(conn, {oid2: [tid2, tid3]})
         self.assertEqual(undo_dict, {
             oid1: [tid0, tid1],
             oid2: [tid2, tid3],

Modified: trunk/neo/tests/zodb/testBasic.py
==============================================================================
--- trunk/neo/tests/zodb/testBasic.py [iso-8859-1] (original)
+++ trunk/neo/tests/zodb/testBasic.py [iso-8859-1] Tue Jan 25 14:59:01 2011
@@ -22,9 +22,7 @@ from ZODB.tests.StorageTestBase import S
 from neo.tests.zodb import ZODBTestCase
 
 class BasicTests(ZODBTestCase, StorageTestBase, BasicStorage):
-
-    def check_tid_ordering_w_commit(self):
-        self.fail("Test disabled")
+    pass
 
 if __name__ == "__main__":
     suite = unittest.makeSuite(BasicTests, 'check')




More information about the Neo-report mailing list