[Neo-report] r2273 vincent - in /trunk/neo: ./ client/ storage/handlers/ tests/ tests/stor...

nobody at svn.erp5.org nobody at svn.erp5.org
Wed Sep 1 16:26:26 CEST 2010


Author: vincent
Date: Wed Sep  1 16:26:25 2010
New Revision: 2273

Log:
Make it possible for client to send data_serial to storage nodes.

This makes it possible to implement an undo on client side, as a flaw has
been found in it when used in parallel with replication.

Modified:
    trunk/neo/client/app.py
    trunk/neo/handler.py
    trunk/neo/protocol.py
    trunk/neo/storage/handlers/client.py
    trunk/neo/tests/storage/testClientHandler.py
    trunk/neo/tests/testProtocol.py

Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Wed Sep  1 16:26:25 2010
@@ -594,32 +594,37 @@ class Application(object):
             raise StorageTransactionError(self, transaction)
         logging.debug('storing oid %s serial %s',
                      dump(oid), dump(serial))
+        self._store(oid, serial, data)
+        return None
+
+    def _store(self, oid, serial, data, data_serial=None):
         # Find which storage node to use
         cell_list = self._getCellListForOID(oid, writable=True)
         if len(cell_list) == 0:
             raise NEOStorageError
-        if data is None:
+        if data is None or data_serial is not None:
+            assert data is None or data_serial is None, data_serial
             # this is a George Bailey object, stored as an empty string
-            data = ''
-        if self.compress:
-            compressed_data = compress(data)
-            if len(compressed_data) > len(data):
-                compressed_data = data
-                compression = 0
-            else:
-                compression = 1
-        else:
-            compressed_data = data
+            compressed_data = ''
             compression = 0
+        else:
+            assert data_serial is None
+            if self.compress:
+                compressed_data = compress(data)
+                if len(compressed_data) > len(data):
+                    compressed_data = data
+                    compression = 0
+                else:
+                    compression = 1
         checksum = makeChecksum(compressed_data)
         p = Packets.AskStoreObject(oid, serial, compression,
-                 checksum, compressed_data, self.local_var.tid)
+                 checksum, compressed_data, data_serial, self.local_var.tid)
         on_timeout = OnTimeout(self.onStoreTimeout, self.local_var.tid, oid)
         # Store object in tmp cache
         self.local_var.data_dict[oid] = data
         # Store data on each node
         self.local_var.object_stored_counter_dict[oid] = {}
-        self.local_var.object_serial_dict[oid] = (serial, version)
+        self.local_var.object_serial_dict[oid] = serial
         getConnForCell = self.cp.getConnForCell
         queue = self.local_var.queue
         add_involved_nodes = self.local_var.involved_nodes.add
@@ -634,7 +639,6 @@ class Application(object):
                 continue
 
         self._waitAnyMessage(False)
-        return None
 
     def onStoreTimeout(self, conn, msg_id, tid, oid):
         # NOTE: this method is called from poll thread, don't use
@@ -664,7 +668,7 @@ class Application(object):
                 # A later serial has already been resolved, skip.
                 resolved_serial_set.update(conflict_serial_dict.pop(oid))
                 continue
-            serial, version = object_serial_dict[oid]
+            serial = object_serial_dict[oid]
             data = data_dict[oid]
             tid = local_var.tid
             resolved = False
@@ -677,8 +681,7 @@ class Application(object):
                     # Mark this conflict as resolved
                     resolved_serial_set.update(conflict_serial_dict.pop(oid))
                     # Try to store again
-                    self.store(oid, conflict_serial, new_data, version,
-                        local_var.txn)
+                    self._store(oid, conflict_serial, new_data)
                     append(oid)
                     resolved = True
                 else:
@@ -939,7 +942,7 @@ class Application(object):
                 raise UndoError('Some data were modified by a later ' \
                     'transaction', oid)
             else:
-                self.store(oid, data_tid, new_data, '', self.local_var.txn)
+                self._store(oid, data_tid, new_data)
 
         oid_list = self.local_var.txn_info['oids']
         # Consistency checking: all oids of the transaction must have been

Modified: trunk/neo/handler.py
==============================================================================
--- trunk/neo/handler.py [iso-8859-1] (original)
+++ trunk/neo/handler.py [iso-8859-1] Wed Sep  1 16:26:25 2010
@@ -227,7 +227,7 @@ class EventHandler(object):
         raise UnexpectedPacketError
 
     def askStoreObject(self, conn, oid, serial,
-                             compression, checksum, data, tid):
+                             compression, checksum, data, data_serial, tid):
         raise UnexpectedPacketError
 
     def answerStoreObject(self, conn, conflicting, oid, serial):

Modified: trunk/neo/protocol.py
==============================================================================
--- trunk/neo/protocol.py [iso-8859-1] (original)
+++ trunk/neo/protocol.py [iso-8859-1] Wed Sep  1 16:26:25 2010
@@ -883,22 +883,26 @@ class AskStoreObject(Packet):
     Ask to store an object. Send an OID, an original serial, a current
     transaction ID, and data. C -> S.
     """
-    _header_format = '!8s8s8sBL'
+    _header_format = '!8s8s8sBL8s'
 
     @profiler_decorator
-    def _encode(self, oid, serial, compression, checksum, data, tid):
+    def _encode(self, oid, serial, compression, checksum, data, data_serial,
+            tid):
         if serial is None:
             serial = INVALID_TID
+        if data_serial is None:
+            data_serial = INVALID_TID
         return pack(self._header_format, oid, serial, tid, compression,
-                          checksum) + _encodeString(data)
+                          checksum, data_serial) + _encodeString(data)
 
     def _decode(self, body):
         header_len = self._header_len
         r = unpack(self._header_format, body[:header_len])
-        oid, serial, tid, compression, checksum = r
+        oid, serial, tid, compression, checksum, data_serial = r
         serial = _decodeTID(serial)
+        data_serial = _decodeTID(data_serial)
         (data, _) = _decodeString(body, 'data', offset=header_len)
-        return (oid, serial, compression, checksum, data, tid)
+        return (oid, serial, compression, checksum, data, data_serial, tid)
 
 class AnswerStoreObject(Packet):
     """

Modified: trunk/neo/storage/handlers/client.py
==============================================================================
--- trunk/neo/storage/handlers/client.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/client.py [iso-8859-1] Wed Sep  1 16:26:25 2010
@@ -47,7 +47,7 @@ class ClientOperationHandler(BaseClientA
         conn.answer(Packets.AnswerStoreTransaction(tid))
 
     def _askStoreObject(self, conn, oid, serial, compression, checksum, data,
-            tid, request_time):
+            data_serial, tid, request_time):
         if tid not in self.app.tm:
             # transaction was aborted, cancel this event
             logging.info('Forget store of %s:%s by %s delayed by %s',
@@ -58,7 +58,7 @@ class ClientOperationHandler(BaseClientA
             return
         try:
             self.app.tm.storeObject(tid, serial, oid, compression,
-                    checksum, data, None)
+                    checksum, data, data_serial)
         except ConflictError, err:
             # resolvable or not
             tid_or_serial = err.getTID()
@@ -75,11 +75,11 @@ class ClientOperationHandler(BaseClientA
             conn.answer(Packets.AnswerStoreObject(0, oid, serial))
 
     def askStoreObject(self, conn, oid, serial,
-                             compression, checksum, data, tid):
+                             compression, checksum, data, data_serial, tid):
         # register the transaction
         self.app.tm.register(conn.getUUID(), tid)
         self._askStoreObject(conn, oid, serial, compression, checksum, data,
-            tid, time.time())
+            data_serial, tid, time.time())
 
     def askTIDs(self, conn, first, last, partition):
         # This method is complicated, because I must return TIDs only

Modified: trunk/neo/tests/storage/testClientHandler.py
==============================================================================
--- trunk/neo/tests/storage/testClientHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testClientHandler.py [iso-8859-1] Wed Sep  1 16:26:25 2010
@@ -215,10 +215,11 @@ class StorageClientHandlerTests(NeoTestB
         conn = self._getConnection(uuid=uuid)
         tid = self.getNextTID()
         oid, serial, comp, checksum, data = self._getObject()
+        data_tid = self.getNextTID()
         self.operation.askStoreObject(conn, oid, serial, comp, checksum, 
-                data, tid)
+                data, data_tid, tid)
         self._checkStoreObjectCalled(tid, serial, oid, comp,
-                checksum, data, None)
+                checksum, data, data_tid)
         pconflicting, poid, pserial = self.checkAnswerStoreObject(conn,
             decode=True)
         self.assertEqual(pconflicting, 0)
@@ -235,8 +236,9 @@ class StorageClientHandlerTests(NeoTestB
             raise ConflictError(locking_tid)
         self.app.tm.storeObject = fakeStoreObject
         oid, serial, comp, checksum, data = self._getObject()
+        data_tid = self.getNextTID()
         self.operation.askStoreObject(conn, oid, serial, comp, checksum, 
-                data, tid)
+                data, data_tid, tid)
         pconflicting, poid, pserial = self.checkAnswerStoreObject(conn,
             decode=True)
         self.assertEqual(pconflicting, 1)

Modified: trunk/neo/tests/testProtocol.py
==============================================================================
--- trunk/neo/tests/testProtocol.py [iso-8859-1] (original)
+++ trunk/neo/tests/testProtocol.py [iso-8859-1] Wed Sep  1 16:26:25 2010
@@ -348,11 +348,13 @@ class ProtocolTests(NeoTestBase):
         oid = self.getNextTID()
         serial = self.getNextTID()
         tid = self.getNextTID()
-        p = Packets.AskStoreObject(oid, serial, 1, 55, "to", tid)
-        poid, pserial, compression, checksum, data, ptid = p.decode()
+        tid2 = self.getNextTID()
+        p = Packets.AskStoreObject(oid, serial, 1, 55, "to", tid2, tid)
+        poid, pserial, compression, checksum, data, ptid2, ptid = p.decode()
         self.assertEqual(oid, poid)
         self.assertEqual(serial, pserial)
         self.assertEqual(tid, ptid)
+        self.assertEqual(tid2, ptid2)
         self.assertEqual(compression, 1)
         self.assertEqual(checksum, 55)
         self.assertEqual(data, "to")





More information about the Neo-report mailing list