[Neo-report] r2273 vincent - in /trunk/neo: ./ client/ storage/handlers/ tests/ tests/stor...
nobody at svn.erp5.org
nobody at svn.erp5.org
Wed Sep 1 16:26:26 CEST 2010
Author: vincent
Date: Wed Sep 1 16:26:25 2010
New Revision: 2273
Log:
Make it possible for client to send data_serial to storage nodes.
This makes it possible to implement an undo on client side, as a flaw has
been found in it when used in parallel with replication.
Modified:
trunk/neo/client/app.py
trunk/neo/handler.py
trunk/neo/protocol.py
trunk/neo/storage/handlers/client.py
trunk/neo/tests/storage/testClientHandler.py
trunk/neo/tests/testProtocol.py
Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Wed Sep 1 16:26:25 2010
@@ -594,32 +594,37 @@ class Application(object):
raise StorageTransactionError(self, transaction)
logging.debug('storing oid %s serial %s',
dump(oid), dump(serial))
+ self._store(oid, serial, data)
+ return None
+
+ def _store(self, oid, serial, data, data_serial=None):
# Find which storage node to use
cell_list = self._getCellListForOID(oid, writable=True)
if len(cell_list) == 0:
raise NEOStorageError
- if data is None:
+ if data is None or data_serial is not None:
+ assert data is None or data_serial is None, data_serial
# this is a George Bailey object, stored as an empty string
- data = ''
- if self.compress:
- compressed_data = compress(data)
- if len(compressed_data) > len(data):
- compressed_data = data
- compression = 0
- else:
- compression = 1
- else:
- compressed_data = data
+ compressed_data = ''
compression = 0
+ else:
+ assert data_serial is None
+ if self.compress:
+ compressed_data = compress(data)
+ if len(compressed_data) > len(data):
+ compressed_data = data
+ compression = 0
+ else:
+ compression = 1
checksum = makeChecksum(compressed_data)
p = Packets.AskStoreObject(oid, serial, compression,
- checksum, compressed_data, self.local_var.tid)
+ checksum, compressed_data, data_serial, self.local_var.tid)
on_timeout = OnTimeout(self.onStoreTimeout, self.local_var.tid, oid)
# Store object in tmp cache
self.local_var.data_dict[oid] = data
# Store data on each node
self.local_var.object_stored_counter_dict[oid] = {}
- self.local_var.object_serial_dict[oid] = (serial, version)
+ self.local_var.object_serial_dict[oid] = serial
getConnForCell = self.cp.getConnForCell
queue = self.local_var.queue
add_involved_nodes = self.local_var.involved_nodes.add
@@ -634,7 +639,6 @@ class Application(object):
continue
self._waitAnyMessage(False)
- return None
def onStoreTimeout(self, conn, msg_id, tid, oid):
# NOTE: this method is called from poll thread, don't use
@@ -664,7 +668,7 @@ class Application(object):
# A later serial has already been resolved, skip.
resolved_serial_set.update(conflict_serial_dict.pop(oid))
continue
- serial, version = object_serial_dict[oid]
+ serial = object_serial_dict[oid]
data = data_dict[oid]
tid = local_var.tid
resolved = False
@@ -677,8 +681,7 @@ class Application(object):
# Mark this conflict as resolved
resolved_serial_set.update(conflict_serial_dict.pop(oid))
# Try to store again
- self.store(oid, conflict_serial, new_data, version,
- local_var.txn)
+ self._store(oid, conflict_serial, new_data)
append(oid)
resolved = True
else:
@@ -939,7 +942,7 @@ class Application(object):
raise UndoError('Some data were modified by a later ' \
'transaction', oid)
else:
- self.store(oid, data_tid, new_data, '', self.local_var.txn)
+ self._store(oid, data_tid, new_data)
oid_list = self.local_var.txn_info['oids']
# Consistency checking: all oids of the transaction must have been
Modified: trunk/neo/handler.py
==============================================================================
--- trunk/neo/handler.py [iso-8859-1] (original)
+++ trunk/neo/handler.py [iso-8859-1] Wed Sep 1 16:26:25 2010
@@ -227,7 +227,7 @@ class EventHandler(object):
raise UnexpectedPacketError
def askStoreObject(self, conn, oid, serial,
- compression, checksum, data, tid):
+ compression, checksum, data, data_serial, tid):
raise UnexpectedPacketError
def answerStoreObject(self, conn, conflicting, oid, serial):
Modified: trunk/neo/protocol.py
==============================================================================
--- trunk/neo/protocol.py [iso-8859-1] (original)
+++ trunk/neo/protocol.py [iso-8859-1] Wed Sep 1 16:26:25 2010
@@ -883,22 +883,26 @@ class AskStoreObject(Packet):
Ask to store an object. Send an OID, an original serial, a current
transaction ID, and data. C -> S.
"""
- _header_format = '!8s8s8sBL'
+ _header_format = '!8s8s8sBL8s'
@profiler_decorator
- def _encode(self, oid, serial, compression, checksum, data, tid):
+ def _encode(self, oid, serial, compression, checksum, data, data_serial,
+ tid):
if serial is None:
serial = INVALID_TID
+ if data_serial is None:
+ data_serial = INVALID_TID
return pack(self._header_format, oid, serial, tid, compression,
- checksum) + _encodeString(data)
+ checksum, data_serial) + _encodeString(data)
def _decode(self, body):
header_len = self._header_len
r = unpack(self._header_format, body[:header_len])
- oid, serial, tid, compression, checksum = r
+ oid, serial, tid, compression, checksum, data_serial = r
serial = _decodeTID(serial)
+ data_serial = _decodeTID(data_serial)
(data, _) = _decodeString(body, 'data', offset=header_len)
- return (oid, serial, compression, checksum, data, tid)
+ return (oid, serial, compression, checksum, data, data_serial, tid)
class AnswerStoreObject(Packet):
"""
Modified: trunk/neo/storage/handlers/client.py
==============================================================================
--- trunk/neo/storage/handlers/client.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/client.py [iso-8859-1] Wed Sep 1 16:26:25 2010
@@ -47,7 +47,7 @@ class ClientOperationHandler(BaseClientA
conn.answer(Packets.AnswerStoreTransaction(tid))
def _askStoreObject(self, conn, oid, serial, compression, checksum, data,
- tid, request_time):
+ data_serial, tid, request_time):
if tid not in self.app.tm:
# transaction was aborted, cancel this event
logging.info('Forget store of %s:%s by %s delayed by %s',
@@ -58,7 +58,7 @@ class ClientOperationHandler(BaseClientA
return
try:
self.app.tm.storeObject(tid, serial, oid, compression,
- checksum, data, None)
+ checksum, data, data_serial)
except ConflictError, err:
# resolvable or not
tid_or_serial = err.getTID()
@@ -75,11 +75,11 @@ class ClientOperationHandler(BaseClientA
conn.answer(Packets.AnswerStoreObject(0, oid, serial))
def askStoreObject(self, conn, oid, serial,
- compression, checksum, data, tid):
+ compression, checksum, data, data_serial, tid):
# register the transaction
self.app.tm.register(conn.getUUID(), tid)
self._askStoreObject(conn, oid, serial, compression, checksum, data,
- tid, time.time())
+ data_serial, tid, time.time())
def askTIDs(self, conn, first, last, partition):
# This method is complicated, because I must return TIDs only
Modified: trunk/neo/tests/storage/testClientHandler.py
==============================================================================
--- trunk/neo/tests/storage/testClientHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testClientHandler.py [iso-8859-1] Wed Sep 1 16:26:25 2010
@@ -215,10 +215,11 @@ class StorageClientHandlerTests(NeoTestB
conn = self._getConnection(uuid=uuid)
tid = self.getNextTID()
oid, serial, comp, checksum, data = self._getObject()
+ data_tid = self.getNextTID()
self.operation.askStoreObject(conn, oid, serial, comp, checksum,
- data, tid)
+ data, data_tid, tid)
self._checkStoreObjectCalled(tid, serial, oid, comp,
- checksum, data, None)
+ checksum, data, data_tid)
pconflicting, poid, pserial = self.checkAnswerStoreObject(conn,
decode=True)
self.assertEqual(pconflicting, 0)
@@ -235,8 +236,9 @@ class StorageClientHandlerTests(NeoTestB
raise ConflictError(locking_tid)
self.app.tm.storeObject = fakeStoreObject
oid, serial, comp, checksum, data = self._getObject()
+ data_tid = self.getNextTID()
self.operation.askStoreObject(conn, oid, serial, comp, checksum,
- data, tid)
+ data, data_tid, tid)
pconflicting, poid, pserial = self.checkAnswerStoreObject(conn,
decode=True)
self.assertEqual(pconflicting, 1)
Modified: trunk/neo/tests/testProtocol.py
==============================================================================
--- trunk/neo/tests/testProtocol.py [iso-8859-1] (original)
+++ trunk/neo/tests/testProtocol.py [iso-8859-1] Wed Sep 1 16:26:25 2010
@@ -348,11 +348,13 @@ class ProtocolTests(NeoTestBase):
oid = self.getNextTID()
serial = self.getNextTID()
tid = self.getNextTID()
- p = Packets.AskStoreObject(oid, serial, 1, 55, "to", tid)
- poid, pserial, compression, checksum, data, ptid = p.decode()
+ tid2 = self.getNextTID()
+ p = Packets.AskStoreObject(oid, serial, 1, 55, "to", tid2, tid)
+ poid, pserial, compression, checksum, data, ptid2, ptid = p.decode()
self.assertEqual(oid, poid)
self.assertEqual(serial, pserial)
self.assertEqual(tid, ptid)
+ self.assertEqual(tid2, ptid2)
self.assertEqual(compression, 1)
self.assertEqual(checksum, 55)
self.assertEqual(data, "to")
More information about the Neo-report
mailing list