[Neo-report] r1788 vincent - in /trunk/neo: client/ client/handlers/ tests/client/

nobody at svn.erp5.org nobody at svn.erp5.org
Wed Feb 17 18:40:44 CET 2010


Author: vincent
Date: Wed Feb 17 18:40:43 2010
New Revision: 1788

Log:
Pipeline "store" action on client side.

Storage.store calls can be pipelined when implementation can take advantage of
it (as Zeo does). This allows reducing the impact of (network-induced, mainly)
latency by sending all objects to storages without waiting for storage answer.

Modified:
    trunk/neo/client/Storage.py
    trunk/neo/client/app.py
    trunk/neo/client/exception.py
    trunk/neo/client/handlers/storage.py
    trunk/neo/tests/client/testClientApp.py
    trunk/neo/tests/client/testStorageHandler.py

Modified: trunk/neo/client/Storage.py
==============================================================================
--- trunk/neo/client/Storage.py [iso-8859-1] (original)
+++ trunk/neo/client/Storage.py [iso-8859-1] Wed Feb 17 18:40:43 2010
@@ -18,8 +18,7 @@
 from ZODB import BaseStorage, ConflictResolution, POSException
 
 from neo.client.app import Application
-from neo.client.exception import NEOStorageConflictError, \
-        NEOStorageNotFoundError
+from neo.client.exception import NEOStorageNotFoundError
 
 class Storage(BaseStorage.BaseStorage,
               ConflictResolution.ConflictResolvingStorage):
@@ -61,7 +60,8 @@
     def tpc_vote(self, transaction):
         if self._is_read_only:
             raise POSException.ReadOnlyError()
-        return self.app.tpc_vote(transaction=transaction)
+        return self.app.tpc_vote(transaction=transaction,
+            tryToResolveConflict=self.tryToResolveConflict)
 
     def tpc_abort(self, transaction):
         if self._is_read_only:
@@ -72,30 +72,11 @@
         return self.app.tpc_finish(transaction=transaction, f=f)
 
     def store(self, oid, serial, data, version, transaction):
-        app = self.app
         if self._is_read_only:
             raise POSException.ReadOnlyError()
-        try:
-            return app.store(oid = oid, serial = serial,
-                             data = data, version = version,
-                             transaction = transaction)
-        except NEOStorageConflictError:
-            conflict_serial = app.getConflictSerial()
-            tid = app.getTID()
-            if conflict_serial <= tid:
-                # Try to resolve conflict only if conflicting serial is older
-                # than the current transaction ID
-                new_data = self.tryToResolveConflict(oid,
-                                                     conflict_serial,
-                                                     serial, data)
-                if new_data is not None:
-                    # Try again after conflict resolution
-                    self.store(oid, conflict_serial,
-                               new_data, version, transaction)
-                    return ConflictResolution.ResolvedSerial
-            raise POSException.ConflictError(oid=oid,
-                                             serials=(tid,
-                                                      serial),data=data)
+        return self.app.store(oid=oid, serial=serial,
+            data=data, version=version, transaction=transaction,
+            tryToResolveConflict=self.tryToResolveConflict)
 
     def getSerial(self, oid):
         try:
@@ -123,11 +104,8 @@
     def undo(self, transaction_id, txn):
         if self._is_read_only:
             raise POSException.ReadOnlyError()
-        try:
-            return self.app.undo(transaction_id = transaction_id,
-                                 txn = txn, wrapper = self)
-        except NEOStorageConflictError:
-            raise POSException.ConflictError
+        return self.app.undo(transaction_id=transaction_id, txn=txn,
+            tryToResolveConflict=self.tryToResolveConflict)
 
 
     def undoLog(self, first, last, filter):

Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Wed Feb 17 18:40:43 2010
@@ -23,6 +23,7 @@
 from time import sleep
 
 from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
+from ZODB.ConflictResolution import ResolvedSerial
 
 from neo import setupLog
 setupLog('CLIENT', verbose=True)
@@ -36,7 +37,7 @@
 from neo.connection import MTClientConnection
 from neo.node import NodeManager
 from neo.connector import getConnectorHandler
-from neo.client.exception import NEOStorageError, NEOStorageConflictError
+from neo.client.exception import NEOStorageError
 from neo.client.exception import NEOStorageNotFoundError, ConnectionClosed
 from neo.exception import NeoException
 from neo.client.handlers import storage, master
@@ -80,6 +81,9 @@
             'tid': None,
             'txn': None,
             'data_dict': {},
+            'object_serial_dict': {},
+            'object_stored_counter_dict': {},
+            'conflict_serial_dict': {},
             'object_stored': 0,
             'txn_voted': False,
             'txn_finished': False,
@@ -88,9 +92,7 @@
             'history': None,
             'node_tids': {},
             'node_ready': False,
-            'conflict_serial': 0,
             'asked_object': 0,
-            'object_stored_counter': 0,
         }
 
 
@@ -534,7 +536,8 @@
         self.local_var.txn = transaction
 
 
-    def store(self, oid, serial, data, version, transaction):
+    def store(self, oid, serial, data, version, transaction,
+        tryToResolveConflict):
         """Store object."""
         if transaction is not self.local_var.txn:
             raise StorageTransactionError(self, transaction)
@@ -551,49 +554,100 @@
         checksum = makeChecksum(compressed_data)
         p = Packets.AskStoreObject(oid, serial, 1,
                  checksum, compressed_data, self.local_var.tid)
+        # Store object in tmp cache
+        self.local_var.data_dict[oid] = data
         # Store data on each node
-        self.local_var.object_stored_counter = 0
+        self.local_var.object_stored_counter_dict[oid] = 0
+        self.local_var.object_serial_dict[oid] = (serial, version)
+        local_queue = self.local_var.queue
         for cell in cell_list:
             conn = self.cp.getConnForCell(cell)
             if conn is None:
                 continue
-
-            self.local_var.object_stored = 0
-            try:
-                self._askStorage(conn, p)
+            try:
+                try:
+                    conn.ask(local_queue, p)
+                finally:
+                    conn.unlock()
             except ConnectionClosed:
                 continue
 
-            # Check we don't get any conflict
-            if self.local_var.object_stored[0] == -1:
-                if self.local_var.data_dict.has_key(oid):
-                    # One storage already accept the object, is it normal ??
-                    # remove from dict and raise ConflictError, don't care of
-                    # previous node which already store data as it would be
-                    # resent again if conflict is resolved or txn will be
-                    # aborted
-                    del self.local_var.data_dict[oid]
-                self.local_var.conflict_serial = self.local_var.object_stored[1]
-                raise NEOStorageConflictError
-            # increase counter so that we know if a node has stored the object
-            # or not
-            self.local_var.object_stored_counter += 1
-
-        if self.local_var.object_stored_counter == 0:
-            # no storage nodes were available
-            raise NEOStorageError('tpc_store failed')
-
-        # Store object in tmp cache
-        self.local_var.data_dict[oid] = data
-
-        return self.local_var.tid
-
-
-    def tpc_vote(self, transaction):
+        self._waitAnyMessage(False)
+        return None
+
+    def _handleConflicts(self, tryToResolveConflict):
+        result = []
+        append = result.append
+        local_var = self.local_var
+        # Check for conflicts
+        data_dict = local_var.data_dict
+        object_serial_dict = local_var.object_serial_dict
+        for oid, conflict_serial in local_var.conflict_serial_dict.items():
+            serial, version = object_serial_dict[oid]
+            data = data_dict[oid]
+            tid = local_var.tid
+            resolved = False
+            if conflict_serial <= tid:
+                new_data = tryToResolveConflict(oid, conflict_serial, serial,
+                    data)
+                if new_data is not None:
+                    # Forget this conflict
+                    del local_var.conflict_serial_dict[oid]
+                    # Try to store again
+                    self.store(oid, conflict_serial, new_data, version,
+                        local_var.txn, tryToResolveConflict)
+                    append(oid)
+                    resolved = True
+            if not resolved:
+                # XXX: Is it really required to remove from data_dict ?
+                del data_dict[oid]
+                raise ConflictError(oid=oid,
+                    serials=(tid, serial), data=data)
+        return result
+
+    def waitStoreResponses(self, tryToResolveConflict):
+        result = []
+        append = result.append
+        resolved_oid_set = set()
+        update = resolved_oid_set.update
+        local_var = self.local_var
+        queue = self.local_var.queue
+        tid = local_var.tid
+        _waitAnyMessage = self._waitAnyMessage
+        _handleConflicts = self._handleConflicts
+        pending = self.dispatcher.pending
+        while True:
+            # Wait for all requests to be answered (or their connection to be
+            # dected as closed)
+            while pending(queue):
+                _waitAnyMessage()
+            conflicts = _handleConflicts(tryToResolveConflict)
+            if conflicts:
+                update(conflicts)
+            else:
+                # No more conflict resolutions to do, no more pending store
+                # requests
+                break
+
+        # Check for never-stored objects, and update result for all others
+        for oid, store_count in \
+            local_var.object_stored_counter_dict.iteritems():
+            if store_count == 0:
+                raise NEOStorageError('tpc_store failed')
+            elif oid in resolved_oid_set:
+                append((oid, ResolvedSerial))
+            else:
+                append((oid, tid))
+        return result
+
+    def tpc_vote(self, transaction, tryToResolveConflict):
         """Store current transaction."""
         local_var = self.local_var
         if transaction is not local_var.txn:
             raise StorageTransactionError(self, transaction)
+
+        result = self.waitStoreResponses(tryToResolveConflict)
+
         tid = local_var.tid
         # Store data on each node
         voted_counter = 0
@@ -626,6 +680,8 @@
         # tpc_finish.
         self._getMasterConnection()
 
+        return result
+
     def tpc_abort(self, transaction):
         """Abort current transaction."""
         if transaction is not self.local_var.txn:
@@ -690,7 +746,7 @@
         finally:
             self._load_lock_release()
 
-    def undo(self, transaction_id, txn, wrapper):
+    def undo(self, transaction_id, txn, tryToResolveConflict):
         if txn is not self.local_var.txn:
             raise StorageTransactionError(self, transaction_id)
 
@@ -739,19 +795,9 @@
         # Third do transaction with old data
         oid_list = data_dict.keys()
         for oid in oid_list:
-            data = data_dict[oid]
-            try:
-                self.store(oid, transaction_id, data, None, txn)
-            except NEOStorageConflictError, serial:
-                if serial <= self.local_var.tid:
-                    new_data = wrapper.tryToResolveConflict(oid,
-                            self.local_var.tid, serial, data)
-                    if new_data is not None:
-                        self.store(oid, self.local_var.tid, new_data, None, txn)
-                        continue
-                raise ConflictError(oid = oid, serials = (self.local_var.tid,
-                    serial),
-                                    data = data)
+            self.store(oid, transaction_id, data_dict[oid], None, txn,
+                tryToResolveConflict)
+        self.waitStoreResponses(tryToResolveConflict)
         return self.local_var.tid, oid_list
 
     def __undoLog(self, first, last, filter=None, block=0, with_oids=False):
@@ -930,9 +976,6 @@
     def getTID(self):
         return self.local_var.tid
 
-    def getConflictSerial(self):
-        return self.local_var.conflict_serial
-
     def setTransactionFinished(self):
         self.local_var.txn_finished = True
 

Modified: trunk/neo/client/exception.py
==============================================================================
--- trunk/neo/client/exception.py [iso-8859-1] (original)
+++ trunk/neo/client/exception.py [iso-8859-1] Wed Feb 17 18:40:43 2010
@@ -23,8 +23,5 @@
 class NEOStorageError(POSException.StorageError):
     pass
 
-class NEOStorageConflictError(NEOStorageError):
-    pass
-
 class NEOStorageNotFoundError(NEOStorageError):
     pass

Modified: trunk/neo/client/handlers/storage.py
==============================================================================
--- trunk/neo/client/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/client/handlers/storage.py [iso-8859-1] Wed Feb 17 18:40:43 2010
@@ -63,10 +63,18 @@
                 compression, checksum, data)
 
     def answerStoreObject(self, conn, conflicting, oid, serial):
+        local_var = self.app.local_var
+        object_stored_counter_dict = local_var.object_stored_counter_dict
         if conflicting:
-            self.app.local_var.object_stored = -1, serial
+            assert object_stored_counter_dict[oid] == 0, \
+                object_stored_counter_dict[oid]
+            previous_conflict_serial = local_var.conflict_serial_dict.get(oid,
+                None)
+            assert previous_conflict_serial in (None, serial), \
+                (previous_conflict_serial, serial)
+            local_var.conflict_serial_dict[oid] = serial
         else:
-            self.app.local_var.object_stored = oid, serial
+            object_stored_counter_dict[oid] += 1
 
     def answerStoreTransaction(self, conn, tid):
         if tid != self.app.getTID():

Modified: trunk/neo/tests/client/testClientApp.py
==============================================================================
--- trunk/neo/tests/client/testClientApp.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testClientApp.py [iso-8859-1] Wed Feb 17 18:40:43 2010
@@ -20,8 +20,7 @@
 from ZODB.POSException import StorageTransactionError, UndoError, ConflictError
 from neo.tests import NeoTestBase
 from neo.client.app import Application
-from neo.client.exception import NEOStorageError, NEOStorageNotFoundError, \
-        NEOStorageConflictError
+from neo.client.exception import NEOStorageError, NEOStorageNotFoundError
 from neo import protocol
 from neo.protocol import Packets, INVALID_TID, INVALID_SERIAL
 from neo.util import makeChecksum
@@ -48,6 +47,12 @@
         raise NotImplementedError
     else:
         handler.dispatch(conn, conn.fakeReceived())
+
+def resolving_tryToResolveConflict(oid, conflict_serial, serial, data):
+  return data
+
+def failing_tryToResolveConflict(oid, conflict_serial, serial, data):
+  return None
 
 class ClientApplicationTests(NeoTestBase):
 
@@ -130,7 +135,7 @@
         cell = Mock({ 'getAddress': 'FakeServer', 'getState': 'FakeState', })
         app.pt = Mock({ 'getCellListForTID': (cell, cell, ) })
         app.cp = Mock({ 'getConnForCell': ReturnValues(None, conn), })
-        app.tpc_vote(txn)
+        app.tpc_vote(txn, resolving_tryToResolveConflict)
 
     def askFinishTransaction(self, app):
         txn = app.local_var.txn
@@ -399,14 +404,16 @@
         # invalid transaction > StorageTransactionError
         app.local_var.txn = old_txn = object()
         self.assertTrue(app.local_var.txn is not txn)
-        self.assertRaises(StorageTransactionError, app.store, oid, tid, '', None, txn)
+        self.assertRaises(StorageTransactionError, app.store, oid, tid, '',
+            None, txn, resolving_tryToResolveConflict)
         self.assertEquals(app.local_var.txn, old_txn)
         # check partition_id and an empty cell list -> NEOStorageError
         app.local_var.txn = txn
         app.local_var.tid = tid
         app.pt = Mock({ 'getCellListForOID': (), })
         app.num_partitions = 2
-        self.assertRaises(NEOStorageError, app.store, oid, tid, '',  None, txn)
+        self.assertRaises(NEOStorageError, app.store, oid, tid, '',  None,
+            txn, resolving_tryToResolveConflict)
         calls = app.pt.mockGetNamedCalls('getCellListForOID')
         self.assertEquals(len(calls), 1)
         self.assertEquals(calls[0].getParam(0), oid) # oid=11
@@ -421,9 +428,10 @@
         app.local_var.tid = tid
         packet = Packets.AnswerStoreObject(conflicting=1, oid=oid, serial=tid)
         packet.setId(0)
-        conn = Mock({
-            'getNextId': 1,
-            'fakeReceived': packet,
+        storage_address = ('127.0.0.1', 10020)
+        conn = Mock({
+            'getNextId': 1,
+            'getAddress': storage_address,
         })
         cell = Mock({
             'getAddress': 'FakeServer',
@@ -431,15 +439,21 @@
         })
         app.pt = Mock({ 'getCellListForOID': (cell, cell, )})
         app.cp = Mock({ 'getConnForCell': ReturnValues(None, conn)})
-        app.dispatcher = Mock({})
+        class Dispatcher(object):
+            def pending(self, queue): 
+                return not queue.empty()
+        app.dispatcher = Dispatcher()
+        app.nm.createStorage(address=storage_address)
         app.local_var.object_stored = (oid, tid)
         app.local_var.data_dict[oid] = 'BEFORE'
-        self.assertRaises(NEOStorageConflictError, app.store, oid, tid, '', None, txn)
+        app.store(oid, tid, '', None, txn, failing_tryToResolveConflict)
+        app.local_var.queue.put((conn, packet))
+        self.assertRaises(ConflictError, app.waitStoreResponses,
+            failing_tryToResolveConflict)
         self.assertTrue(oid not in app.local_var.data_dict)
-        self.assertEquals(app.getConflictSerial(), tid)
-        self.assertEquals(app.local_var.object_stored, (-1, tid))
+        self.assertEquals(app.local_var.conflict_serial_dict[oid], tid)
+        self.assertEquals(app.local_var.object_stored_counter_dict[oid], 0)
         self.checkAskStoreObject(conn)
-        self.checkDispatcherRegisterCalled(app, conn)
 
     def test_store3(self):
         app = self.getApp()
@@ -451,9 +465,10 @@
         app.local_var.tid = tid
         packet = Packets.AnswerStoreObject(conflicting=0, oid=oid, serial=tid)
         packet.setId(0)
-        conn = Mock({
-            'getNextId': 1,
-            'fakeReceived': packet,
+        storage_address = ('127.0.0.1', 10020)
+        conn = Mock({
+            'getNextId': 1,
+            'getAddress': storage_address,
         })
         app.cp = Mock({ 'getConnForCell': ReturnValues(None, conn, ) })
         cell = Mock({
@@ -461,15 +476,18 @@
             'getState': 'FakeState',
         })
         app.pt = Mock({ 'getCellListForOID': (cell, cell, ) })
-        app.dispatcher = Mock({})
-        app.conflict_serial = None # reset by hand
-        app.local_var.object_stored = ()
-        app.store(oid, tid, 'DATA', None, txn)
-        self.assertEquals(app.local_var.object_stored, (oid, tid))
+        class Dispatcher(object):
+            def pending(self, queue): 
+                return not queue.empty()
+        app.dispatcher = Dispatcher()
+        app.nm.createStorage(address=storage_address)
+        app.store(oid, tid, 'DATA', None, txn, resolving_tryToResolveConflict)
+        self.checkAskStoreObject(conn)
+        app.local_var.queue.put((conn, packet))
+        app.waitStoreResponses(resolving_tryToResolveConflict)
+        self.assertEquals(app.local_var.object_stored_counter_dict[oid], 1)
         self.assertEquals(app.local_var.data_dict.get(oid, None), 'DATA')
-        self.assertNotEquals(app.conflict_serial, tid)
-        self.checkAskStoreObject(conn)
-        self.checkDispatcherRegisterCalled(app, conn)
+        self.assertFalse(oid in app.local_var.conflict_serial_dict)
 
     def test_tpc_vote1(self):
         app = self.getApp()
@@ -478,7 +496,8 @@
         # invalid transaction > StorageTransactionError
         app.local_var.txn = old_txn = object()
         self.assertTrue(app.local_var.txn is not txn)
-        self.assertRaises(StorageTransactionError, app.tpc_vote, txn)
+        self.assertRaises(StorageTransactionError, app.tpc_vote, txn,
+            resolving_tryToResolveConflict)
         self.assertEquals(app.local_var.txn, old_txn)
 
     def test_tpc_vote2(self):
@@ -504,7 +523,8 @@
         app.cp = Mock({ 'getConnForCell': ReturnValues(None, conn), })
         app.dispatcher = Mock()
         app.tpc_begin(txn, tid)
-        self.assertRaises(NEOStorageError, app.tpc_vote, txn)
+        self.assertRaises(NEOStorageError, app.tpc_vote, txn,
+            resolving_tryToResolveConflict)
         calls = conn.mockGetNamedCalls('ask')
         self.assertEquals(len(calls), 1)
         packet = calls[0].getParam(1)
@@ -531,7 +551,7 @@
         app.cp = Mock({ 'getConnForCell': ReturnValues(None, conn), })
         app.dispatcher = Mock()
         app.tpc_begin(txn, tid)
-        app.tpc_vote(txn)
+        app.tpc_vote(txn, resolving_tryToResolveConflict)
         self.checkAskStoreTransaction(conn)
         self.checkDispatcherRegisterCalled(app, conn)
 
@@ -668,18 +688,21 @@
         app = self.getApp()
         tid = self.makeTID()
         txn = self.makeTransactionObject()
-        wrapper = Mock()
+        marker = []
+        def tryToResolveConflict(oid, conflict_serial, serial, data):
+            marker.append(1)
         app.local_var.txn = old_txn = object()
         app.master_conn = Mock()
         self.assertFalse(app.local_var.txn is txn)
         conn = Mock()
         cell = Mock()
-        self.assertRaises(StorageTransactionError, app.undo, tid, txn, wrapper)
+        self.assertRaises(StorageTransactionError, app.undo, tid, txn,
+            tryToResolveConflict)
         # no packet sent
         self.checkNoPacketSent(conn)
         self.checkNoPacketSent(app.master_conn)
         # nothing done
-        self.assertEquals(len(wrapper.mockGetNamedCalls('tryToResolveConflict')), 0)
+        self.assertEquals(marker, [])
         self.assertEquals(app.local_var.txn, old_txn)
 
     def test_undo2(self):
@@ -724,13 +747,19 @@
         u4p2 = Packets.AnswerObject(oid2, tid3, tid3, 0, makeChecksum('O2V2'), 'O2V2')
         u4p3 = Packets.AnswerStoreObject(conflicting=0, oid=oid2, serial=tid2)
         # test logic
-        packets = (u1p1, u1p2, u2p1, u2p2, u3p1, u3p2, u3p3, u3p1, u4p2, u4p3)
+        packets = (u1p1, u1p2, u2p1, u2p2, u3p1, u3p2, u3p3, u4p1, u4p2, u4p3)
         for i, p in enumerate(packets):
             p.setId(p)
-        conn = Mock({
-            'getNextId': 1,
-            'fakeReceived': ReturnValues(*packets),
-            'getAddress': ('127.0.0.1', 10010),
+        storage_address = ('127.0.0.1', 10010)
+        conn = Mock({
+            'getNextId': 1,
+            'fakeReceived': ReturnValues(
+                u1p1, u1p2,
+                u2p1, u2p2,
+                u4p1, u4p2,
+                u3p1, u3p2,
+            ),
+            'getAddress': storage_address,
         })
         cell = Mock({ 'getAddress': 'FakeServer', 'getState': 'FakeState', })
         app.pt = Mock({
@@ -738,14 +767,27 @@
             'getCellListForOID': (cell, ),
         })
         app.cp = Mock({ 'getConnForCell': conn})
-        wrapper = Mock({'tryToResolveConflict': None})
+        marker = []
+        def tryToResolveConflict(oid, conflict_serial, serial, data):
+            marker.append(1)
+        class Dispatcher(object):
+            def pending(self, queue): 
+                return not queue.empty()
+        app.dispatcher = Dispatcher()
+        app.nm.createStorage(address=storage_address)
         txn4 = self.beginTransaction(app, tid=tid4)
         # all start here
-        self.assertRaises(UndoError, app.undo, tid1, txn4, wrapper)
-        self.assertRaises(UndoError, app.undo, tid2, txn4, wrapper)
-        self.assertRaises(ConflictError, app.undo, tid3, txn4, wrapper)
-        self.assertEquals(len(wrapper.mockGetNamedCalls('tryToResolveConflict')), 1)
-        self.assertEquals(app.undo(tid3, txn4, wrapper), (tid4, [oid2, ]))
+        self.assertRaises(UndoError, app.undo, tid1, txn4,
+            tryToResolveConflict)
+        self.assertRaises(UndoError, app.undo, tid2, txn4,
+            tryToResolveConflict)
+        app.local_var.queue.put((conn, u4p3))
+        self.assertEquals(app.undo(tid3, txn4, tryToResolveConflict),
+            (tid4, [oid2, ]))
+        app.local_var.queue.put((conn, u3p3))
+        self.assertRaises(ConflictError, app.undo, tid3, txn4,
+            tryToResolveConflict)
+        self.assertEquals(marker, [1])
         self.askFinishTransaction(app)
 
     def test_undoLog(self):

Modified: trunk/neo/tests/client/testStorageHandler.py
==============================================================================
--- trunk/neo/tests/client/testStorageHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testStorageHandler.py [iso-8859-1] Wed Feb 17 18:40:43 2010
@@ -86,13 +86,18 @@
         oid = self.getOID(0)
         tid = self.getNextTID()
         # conflict
-        self.app.local_var.object_stored = None
+        local_var = self.app.local_var
+        local_var.object_stored_counter_dict = {oid: 0}
+        local_var.conflict_serial_dict = {}
         self.handler.answerStoreObject(conn, 1, oid, tid)
-        self.assertEqual(self.app.local_var.object_stored, (-1, tid))
+        self.assertEqual(local_var.conflict_serial_dict[oid], tid)
+        self.assertFalse(local_var.object_stored_counter_dict[oid], 0)
         # no conflict
-        self.app.local_var.object_stored = None
+        local_var.object_stored_counter_dict = {oid: 0}
+        local_var.conflict_serial_dict = {}
         self.handler.answerStoreObject(conn, 0, oid, tid)
-        self.assertEqual(self.app.local_var.object_stored, (oid, tid))
+        self.assertFalse(oid in local_var.conflict_serial_dict)
+        self.assertEqual(local_var.object_stored_counter_dict[oid], 1)
 
     def test_answerStoreTransaction(self):
         conn = self.getConnection()





More information about the Neo-report mailing list