[Neo-report] r2652 gregory - in /trunk/neo: client/handlers/ lib/ tests/
nobody at svn.erp5.org
nobody at svn.erp5.org
Tue Feb 8 16:47:10 CET 2011
Author: gregory
Date: Tue Feb 8 16:47:10 2011
New Revision: 2652
Log:
New protocol parser, semantic oriented.
Modified:
trunk/neo/client/handlers/master.py
trunk/neo/lib/handler.py
trunk/neo/lib/logger.py
trunk/neo/lib/protocol.py
trunk/neo/lib/util.py
trunk/neo/tests/testProtocol.py
Modified: trunk/neo/client/handlers/master.py
==============================================================================
--- trunk/neo/client/handlers/master.py [iso-8859-1] (original)
+++ trunk/neo/client/handlers/master.py [iso-8859-1] Tue Feb 8 16:47:10 2011
@@ -160,7 +160,7 @@ class PrimaryAnswersHandler(AnswerBaseHa
self.app.setHandlerData(ttid)
def answerNewOIDs(self, conn, oid_list):
- self.app.new_oid_list = oid_list
+ self.app.new_oid_list = list(oid_list)
def answerTransactionFinished(self, conn, _, tid):
self.app.setHandlerData(tid)
Modified: trunk/neo/lib/handler.py
==============================================================================
--- trunk/neo/lib/handler.py [iso-8859-1] (original)
+++ trunk/neo/lib/handler.py [iso-8859-1] Tue Feb 8 16:47:10 2011
@@ -133,10 +133,13 @@ class EventHandler(object):
def notify(self, conn, message):
neo.lib.logging.info('notification from %r: %s', conn, message)
- def requestIdentification(self, conn, node_type,
- uuid, address, name):
+ def requestIdentification(self, conn, node_type, uuid, address, name):
raise UnexpectedPacketError
+ def _requestIdentification(self, conn, protocol, node_type,
+ uuid, address, name):
+ self.requestIdentification(conn, node_type, uuid, address, name)
+
def acceptIdentification(self, conn, node_type,
uuid, num_partitions, num_replicas, your_uuid):
raise UnexpectedPacketError
@@ -428,7 +431,7 @@ class EventHandler(object):
d[Packets.Error] = self.error
d[Packets.Notify] = self.notify
- d[Packets.RequestIdentification] = self.requestIdentification
+ d[Packets.RequestIdentification] = self._requestIdentification
d[Packets.AcceptIdentification] = self.acceptIdentification
d[Packets.AskPrimary] = self.askPrimary
d[Packets.AnswerPrimary] = self.answerPrimary
Modified: trunk/neo/lib/logger.py
==============================================================================
--- trunk/neo/lib/logger.py [iso-8859-1] (original)
+++ trunk/neo/lib/logger.py [iso-8859-1] Tue Feb 8 16:47:10 2011
@@ -37,8 +37,11 @@ class PacketLogger(object):
klass = packet.getType()
uuid = dump(conn.getUUID())
ip, port = conn.getAddress()
+ packet_name = packet.__class__.__name__
+ if packet.isResponse() and packet._request is not None:
+ packet_name += packet._request.__name__
neo.lib.logging.debug('#0x%08x %-30s %s %s (%s:%d)', packet.getId(),
- packet.__class__.__name__, direction, uuid, ip, port)
+ packet_name, direction, uuid, ip, port)
# look for custom packet logger
logger = self.packet_dispatch_table.get(klass, None)
logger = logger and getattr(self, logger.im_func.__name__, None)
Modified: trunk/neo/lib/protocol.py
==============================================================================
--- trunk/neo/lib/protocol.py [iso-8859-1] (original)
+++ trunk/neo/lib/protocol.py [iso-8859-1] Tue Feb 8 16:47:10 2011
@@ -15,12 +15,13 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-from struct import pack, unpack, error, calcsize
+import sys
+import traceback
+from types import ClassType
from socket import inet_ntoa, inet_aton
-from neo.lib.profiling import profiler_decorator
from cStringIO import StringIO
-from neo.lib.util import Enum
+from neo.lib.util import Enum, Struct
# The protocol version (major, minor).
PROTOCOL_VERSION = (4, 1)
@@ -28,13 +29,13 @@ PROTOCOL_VERSION = (4, 1)
# Size restrictions.
MIN_PACKET_SIZE = 10
MAX_PACKET_SIZE = 0x4000000
-PACKET_HEADER_FORMAT = '!LHL'
-PACKET_HEADER_SIZE = calcsize(PACKET_HEADER_FORMAT)
+PACKET_HEADER_FORMAT = Struct('!LHL')
# Check that header size is the expected value.
# If it is not, it means that struct module result is incompatible with
# "reference" platform (python 2.4 on x86-64).
-assert PACKET_HEADER_SIZE == 10, \
- 'Unsupported platform, packet header length = %i' % (PACKET_HEADER_SIZE, )
+assert PACKET_HEADER_FORMAT.size == 10, \
+ 'Unsupported platform, packet header length = %i' % \
+ (PACKET_HEADER_FORMAT.size, )
RESPONSE_MASK = 0x8000
class ErrorCodes(Enum):
@@ -109,6 +110,7 @@ INVALID_UUID = '\0' * 16
INVALID_TID = '\xff' * 8
INVALID_OID = '\xff' * 8
INVALID_PARTITION = 0xffffffff
+INVALID_ADDRESS = ('0.0.0.0', 0)
ZERO_TID = '\0' * 8
ZERO_OID = '\0' * 8
OID_LEN = len(INVALID_OID)
@@ -141,128 +143,42 @@ class BrokenNodeDisallowedError(Protocol
""" Just close the connection """
pass
-
-# packet parser
-def _decodeClusterState(state):
- cluster_state = ClusterStates.get(state)
- if cluster_state is None:
- raise PacketMalformedError('invalid cluster state %d' % state)
- return cluster_state
-
-def _decodeNodeState(state):
- node_state = NodeStates.get(state)
- if node_state is None:
- raise PacketMalformedError('invalid node state %d' % state)
- return node_state
-
-def _decodeNodeType(original_node_type):
- node_type = NodeTypes.get(original_node_type)
- if node_type is None:
- raise PacketMalformedError('invalid node type %d' % original_node_type)
- return node_type
-
-def _decodeErrorCode(original_error_code):
- error_code = ErrorCodes.get(original_error_code)
- if error_code is None:
- raise PacketMalformedError('invalid error code %d' %
- original_error_code)
- return error_code
-
-def _decodeLockState(original_lock_state):
- lock_state = LockState.get(original_lock_state)
- if lock_state is None:
- raise PacketMalformedError('invalid lock state %d' % (
- original_lock_state, ))
- return lock_state
-
-def _decodeAddress(address):
- if address == '\0' * 6:
- return None
- (ip, port) = unpack('!4sH', address)
- return (inet_ntoa(ip), port)
-
-def _encodeAddress(address):
- if address is None:
- return '\0' * 6
- # address is a tuple (ip, port)
- return pack('!4sH', inet_aton(address[0]), address[1])
-
-def _decodeUUID(uuid):
- if uuid == INVALID_UUID:
- return None
- return uuid
-
-def _encodeUUID(uuid):
- if uuid is None:
- return INVALID_UUID
- return uuid
-
-def _decodePTID(ptid):
- ptid = unpack('!Q', ptid)[0]
- if ptid == 0:
- return None
- return ptid
-
-def _encodePTID(ptid):
- if ptid is None:
- ptid = 0
- assert isinstance(ptid, (int, long)), ptid
- return pack('!Q', ptid)
-
-def _decodeTID(tid):
- if tid == INVALID_TID:
- return None
- return tid
-
-def _encodeTID(tid):
- if tid is None:
- return INVALID_TID
- return tid
-
-def _decodeString(buf, name, offset=0):
- buf = buf[offset:]
- (size, ) = unpack('!L', buf[:4])
- string = buf[4:4+size]
- if len(string) != size:
- raise PacketMalformedError("can't read string <%s>" % name)
- return (string, buf[offset+4+size:])
-
- at profiler_decorator
-def _encodeString(buf):
- return pack('!L', len(buf)) + buf
-
class Packet(object):
"""
- Base class for any packet definition.
- Each subclass should override _encode() and _decode() and return a string or
- a tuple respectively.
+ Base class for any packet definition. The _fmt class attribute must be
+ defined for any non-empty packet.
"""
-
_ignore_when_closed = False
- _header_format = None
- _header_len = None
_request = None
_answer = None
_body = None
_code = None
+ _fmt = None
_id = None
def __init__(self, *args, **kw):
+ args = list(args)
assert self._code is not None, "Packet class not registered"
- if args != () or kw != {}:
- body = self._encode(*args, **kw)
+ if args or kw:
+ assert self._fmt is not None
+ buf = StringIO()
+ # load named arguments
+ for item in self._fmt._items[len(args):]:
+ args.append(kw.get(item._name))
+ self._fmt.encode(buf.write, args)
+ body = buf.getvalue()
else:
body = ''
self._body = body
def decode(self):
assert self._body is not None
+ if self._fmt is None:
+ return ()
+ buf = StringIO(self._body)
try:
- return self._decode(self._body)
- except error, msg: # struct.error
- name = self.__class__.__name__
- raise PacketMalformedError("%s fail (%s)" % (name, msg))
- except PacketMalformedError, msg:
+ return self._fmt.decode(buf.read)
+ except ParseError, msg:
name = self.__class__.__name__
raise PacketMalformedError("%s fail (%s)" % (name, msg))
@@ -278,23 +194,17 @@ class Packet(object):
assert self._id is not None, "No identifier applied on the packet"
return self._id
- def getCode(self):
- return self._code
-
def getType(self):
return self.__class__
- @profiler_decorator
def encode(self):
""" Encode a packet as a string to send it over the network """
content = self._body
- length = PACKET_HEADER_SIZE + len(content)
- return (pack(PACKET_HEADER_FORMAT, self._id, self._code, length),
- content)
+ length = PACKET_HEADER_FORMAT.size + len(content)
+ return (PACKET_HEADER_FORMAT.pack(self._id, self._code, length), content)
- @profiler_decorator
def __len__(self):
- return PACKET_HEADER_SIZE + len(self._body)
+ return PACKET_HEADER_FORMAT.size + len(self._body)
def __repr__(self):
return '%s[%r]' % (self.__class__.__name__, self._id)
@@ -306,17 +216,6 @@ class Packet(object):
assert isinstance(other, Packet)
return self._code == other._code
- def _encode(self, *args, **kw):
- """ Default encoder, join all arguments """
- args = list(args)
- args.extend(kw.values())
- return ''.join([str(i) for i in args] or '')
-
- def _decode(self, body):
- """ Default decoder, message must be empty """
- assert body == '', "Non-empty packet decoding not implemented """
- return ()
-
def isError(self):
return isinstance(self, Error)
@@ -333,1230 +232,907 @@ class Packet(object):
"""
return self._ignore_when_closed
-class Notify(Packet):
+class ParseError(Exception):
"""
- General purpose notification (remote logging)
+ An exception that encapsulate another and build the 'path' of the
+ packet item that generate the error.
"""
- def _encode(self, message):
- return message
+ def __init__(self, item, trace):
+ Exception.__init__(self)
+ self._trace = trace
+ self._items = [item]
- def _decode(self, body):
- return (body, )
+ def append(self, item):
+ self._items.append(item)
-class Ping(Packet):
+ def __repr__(self):
+ chain = '/'.join([item.getName() for item in reversed(self._items)])
+ return 'at %s:\n%s' % (chain, self._trace)
+
+ __str__ = __repr__
+
+# packet parsers
+
+class PItem(object):
"""
- Check if a peer is still alive. Any -> Any.
+ Base class for any packet item, _encode and _decode must be overriden
+ by subclasses.
"""
- pass
+ def __init__(self, name):
+ self._name = name
+
+ def __repr__(self):
+ return self.__class__.__name__
-class Pong(Packet):
+ def getName(self):
+ return self._name
+
+ def _trace(self, method, *args):
+ try:
+ return method(*args)
+ except ParseError, e:
+ # trace and forward exception
+ e.append(self)
+ raise
+ except Exception:
+ # original exception, encapsulate it
+ trace = ''.join(traceback.format_exception(*sys.exc_info())[2:])
+ raise ParseError(self, trace)
+
+ def encode(self, writer, items):
+ return self._trace(self._encode, writer, items)
+
+ def decode(self, reader):
+ return self._trace(self._decode, reader)
+
+ def _encode(self, writer):
+ raise NotImplementedError, self.__class__.__name__
+
+ def _decode(self, reader):
+ raise NotImplementedError, self.__class__.__name__
+
+class PStruct(PItem):
+ """
+ Aggregate other items
+ """
+ def __init__(self, name, *items):
+ PItem.__init__(self, name)
+ self._items = items
+
+ def _encode(self, writer, items):
+ assert len(self._items) == len(items), (items, self._items)
+ for item, value in zip(self._items, items):
+ item.encode(writer, value)
+
+ def _decode(self, reader):
+ return tuple([item.decode(reader) for item in self._items])
+
+class PStructItem(PItem):
+ """
+ A single value encoded with struct
+ """
+ def __init__(self, name, fmt):
+ PItem.__init__(self, name)
+ struct = Struct(fmt)
+ self.pack = struct.pack
+ self.unpack = struct.unpack
+ self.size = struct.size
+
+ def _encode(self, writer, value):
+ writer(self.pack(value))
+
+ def _decode(self, reader):
+ return self.unpack(reader(self.size))[0]
+
+class PList(PStructItem):
+ """
+ A list of homogeneous items
+ """
+ def __init__(self, name, item):
+ PStructItem.__init__(self, name, '!L')
+ self._item = item
+
+ def _encode(self, writer, items):
+ assert isinstance(items, (list, tuple, set)), (type(items), items)
+ writer(self.pack(len(items)))
+ item = self._item
+ for value in items:
+ item.encode(writer, value)
+
+ def _decode(self, reader):
+ length = self.unpack(reader(self.size))[0]
+ item = self._item
+ return [item.decode(reader) for _ in xrange(length)]
+
+class PDict(PStructItem):
+ """
+ A dictionary with custom key and value formats
+ """
+ def __init__(self, name, key, value):
+ PStructItem.__init__(self, name, '!L')
+ self._key = key
+ self._value = value
+
+ def _encode(self, writer, item):
+ assert isinstance(item , dict), (type(item), item)
+ writer(self.pack(len(item)))
+ key, value = self._key, self._value
+ for k, v in item.iteritems():
+ key.encode(writer, k)
+ value.encode(writer, v)
+
+ def _decode(self, reader):
+ length = self.unpack(reader(self.size))[0]
+ key, value = self._key, self._value
+ new_dict = {}
+ for _ in xrange(length):
+ k = key.decode(reader)
+ v = value.decode(reader)
+ new_dict[k] = v
+ return new_dict
+
+class PEnum(PStructItem):
"""
- Notify being alive. Any -> Any.
+ Encapsulate an enumeration value
"""
- pass
+ def __init__(self, name, enum):
+ PStructItem.__init__(self, name, '!L')
+ self._enum = enum
-class RequestIdentification(Packet):
+ def _encode(self, writer, item):
+ assert isinstance(item, int), item
+ writer(self.pack(item))
+
+ def _decode(self, reader):
+ code = self.unpack(reader(self.size))[0]
+ try:
+ return self._enum[code]
+ except KeyError:
+ enum = self._enum.__class__.__name__
+ raise ValueError, 'Invalid code for %s enum: %r' % (enum, code)
+
+class PAddress(PStructItem):
+ """
+ An host address (IPv4 for now)
+ """
+ def __init__(self, name):
+ PStructItem.__init__(self, name, '!4sH')
+
+ def _encode(self, writer, address):
+ if address is None:
+ address = INVALID_ADDRESS
+ assert len(address) == 2, address
+ host, port = address
+ host = inet_aton(host)
+ writer(self.pack(host, port))
+
+ def _decode(self, reader):
+ data = reader(self.size)
+ host, port = self.unpack(data)
+ host = inet_ntoa(host)
+ if (host, port) == INVALID_ADDRESS:
+ return None
+ return (host, port)
+
+class PString(PStructItem):
"""
- Request a node identification. This must be the first packet for any
- connection. Any -> Any.
+ A variable-length string
+ """
+ def __init__(self, name):
+ PStructItem.__init__(self, name, '!L')
+
+ def _encode(self, writer, value):
+ writer(self.pack(len(value)))
+ writer(value)
+
+ def _decode(self, reader):
+ length = self.unpack(reader(self.size))[0]
+ return reader(length)
+
+class PBoolean(PStructItem):
+ """
+ A boolean value, encoded as a single byte
+ """
+ def __init__(self, name):
+ PStructItem.__init__(self, name, '!B')
+
+ def _encode(self, writer, value):
+ writer(self.pack(bool(value)))
+
+ def _decode(self, reader):
+ return bool(self.unpack(reader(self.size))[0])
+
+class PNumber(PStructItem):
+ """
+ A integer number (4-bytes length)
+ """
+ def __init__(self, name):
+ PStructItem.__init__(self, name, '!L')
+
+class PChecksum(PStructItem):
+ """
+ A checksum
+ """
+ def __init__(self, name):
+ PStructItem.__init__(self, name, '!Q')
+
+class PIndex(PStructItem):
+ """
+ A big integer to defined indexes in a huge list.
+ """
+ def __init__(self, name):
+ PStructItem.__init__(self, name, '!Q')
+
+class PPTID(PStructItem):
+ """
+ A None value means an invalid PTID
"""
- _header_format = '!LLH16s6s'
+ def __init__(self, name):
+ PStructItem.__init__(self, name, '!Q')
- def _encode(self, node_type, uuid, address, name):
- uuid = _encodeUUID(uuid)
- address = _encodeAddress(address)
- return pack(self._header_format, PROTOCOL_VERSION[0],
- PROTOCOL_VERSION[1], node_type, uuid, address) + \
- _encodeString(name)
-
- def _decode(self, body):
- r = unpack(self._header_format, body[:self._header_len])
- major, minor, node_type, uuid, address = r
- address = _decodeAddress(address)
- (name, _) = _decodeString(body, 'name', offset=self._header_len)
- node_type = _decodeNodeType(node_type)
- uuid = _decodeUUID(uuid)
+ def _encode(self, writer, value):
+ if value is None:
+ value = 0
+ PStructItem._encode(self, writer, value)
+
+ def _decode(self, reader):
+ value = PStructItem._decode(self, reader)
+ if value == 0:
+ value = None
+ return value
+
+class PProtocol(PStructItem):
+ """
+ The protocol version definition
+ """
+ def __init__(self, name):
+ PStructItem.__init__(self, name, '!LL')
+
+ def _encode(self, writer, version):
+ writer(self.pack(*version))
+
+ def _decode(self, reader):
+ major, minor = self.unpack(reader(self.size))
if (major, minor) != PROTOCOL_VERSION:
- raise PacketMalformedError('protocol version mismatch')
- return (node_type, uuid, address, name)
+ raise ProtocolError('protocol version mismatch')
+ return (major, minor)
-class AcceptIdentification(Packet):
+class PUUID(PItem):
"""
- Accept a node identification. This should be a reply to Request Node
- Identification. Any -> Any.
+ An UUID (node identifier)
"""
- _header_format = '!H16sLL16s'
-
- def _encode(self, node_type, uuid,
- num_partitions, num_replicas, your_uuid):
- uuid = _encodeUUID(uuid)
- your_uuid = _encodeUUID(your_uuid)
- return pack(self._header_format, node_type, uuid,
- num_partitions, num_replicas, your_uuid)
-
- def _decode(self, body):
- r = unpack(self._header_format, body)
- node_type, uuid, num_partitions, num_replicas, your_uuid = r
- node_type = _decodeNodeType(node_type)
- uuid = _decodeUUID(uuid)
- your_uuid = _decodeUUID(your_uuid)
- return (node_type, uuid, num_partitions, num_replicas, your_uuid)
+ def _encode(self, writer, uuid):
+ if uuid is None:
+ uuid = INVALID_UUID
+ assert len(uuid) == 16, (len(uuid), uuid)
+ writer(uuid)
+
+ def _decode(self, reader):
+ uuid = reader(16)
+ if uuid == INVALID_UUID:
+ uuid = None
+ return uuid
+
+class PTID(PItem):
+ """
+ A transaction identifier
+ """
+ def _encode(self, writer, tid):
+ if tid is None:
+ tid = INVALID_TID
+ assert len(tid) == 8, (len(tid), tid)
+ writer(tid)
+
+ def _decode(self, reader):
+ tid = reader(8)
+ if tid == INVALID_TID:
+ tid = None
+ return tid
+
+# same definition, for now
+POID = PTID
+
+# common definitions
+
+PFEmpty = PStruct('no_content')
+PFNodeType = PEnum('type', NodeTypes)
+PFNodeState = PEnum('state', NodeStates)
+PFCellState = PEnum('state', CellStates)
+
+PFNodeList = PList('node_list',
+ PStruct('node',
+ PFNodeType,
+ PAddress('address'),
+ PUUID('uuid'),
+ PFNodeState,
+ ),
+)
+
+PFCellList = PList('cell_list',
+ PStruct('cell',
+ PUUID('uuid'),
+ PFCellState,
+ ),
+)
+
+PFRowList = PList('row_list',
+ PStruct('row',
+ PNumber('offset'),
+ PFCellList,
+ ),
+)
+
+PFHistoryList = PList('history_list',
+ PStruct('history_entry',
+ PTID('serial'),
+ PNumber('size'),
+ ),
+)
+
+PFUUIDList = PList('uuid_list',
+ PUUID('uuid'),
+)
+
+PFTidList = PList('tid_list',
+ PTID('tid'),
+)
+
+PFOidList = PList('oid_list',
+ POID('oid'),
+)
-class AskPrimary(Packet):
+# packets definition
+
+class Notify(Packet):
"""
- Ask a current primary master node. This must be the second message when
- connecting to a master node. Any -> M.
+ General purpose notification (remote logging)
"""
- pass
+ _fmt = PStruct('notify',
+ PString('message'),
+ )
+
+class Error(Packet):
+ """
+ Error is a special type of message, because this can be sent against
+ any other message, even if such a message does not expect a reply
+ usually. Any -> Any.
+ """
+ _fmt = PStruct('error',
+ PNumber('code'),
+ PString('message'),
+ )
+
+class Ping(Packet):
+ """
+ Check if a peer is still alive. Any -> Any.
+ """
+ _answer = PFEmpty
-class AnswerPrimary(Packet):
+class RequestIdentification(Packet):
+ """
+ Request a node identification. This must be the first packet for any
+ connection. Any -> Any.
"""
+
+ _fmt = PStruct('request_identification',
+ PProtocol('protocol_version'),
+ PFNodeType,
+ PUUID('uuid'),
+ PAddress('address'),
+ PString('name'),
+ )
+
+ _answer = PStruct('accept_identification',
+ PFNodeType,
+ PUUID('my_uuid'),
+ PNumber('num_partitions'),
+ PNumber('num_replicas'),
+ PUUID('your_uuid'),
+ )
+
+ def __init__(self, *args, **kw):
+ if args or kw:
+ # always announce current protocol version
+ args = list(args)
+ args.insert(0, PROTOCOL_VERSION)
+ super(RequestIdentification, self).__init__(*args, **kw)
+
+class PrimaryMaster(Packet):
+ """
+ Ask a current primary master node. This must be the second message when
+ connecting to a master node. Any -> M.
Reply to Ask Primary Master. This message includes a list of known master
nodes to make sure that a peer has the same information. M -> Any.
"""
- _header_format = '!16sL'
- _list_entry_format = '!6s16s'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, primary_uuid, known_master_list):
- primary_uuid = _encodeUUID(primary_uuid)
- body = [pack(self._header_format, primary_uuid,
- len(known_master_list))]
- for address, uuid in known_master_list:
- uuid = _encodeUUID(uuid)
- address = _encodeAddress(address)
- body.append(pack(self._list_entry_format, address, uuid))
- return ''.join(body)
-
- def _decode(self, body):
- packet_offset = self._header_len
- (primary_uuid, n) = unpack(self._header_format,
- body[:packet_offset])
- known_master_list = []
- list_entry_len = self._list_entry_len
- list_entry_format = self._list_entry_format
- for _ in xrange(n):
- next_packet_offset = packet_offset + list_entry_len
- address, uuid = unpack(list_entry_format,
- body[packet_offset:next_packet_offset])
- packet_offset = next_packet_offset
- address = _decodeAddress(address)
- uuid = _decodeUUID(uuid)
- known_master_list.append((address, uuid))
- primary_uuid = _decodeUUID(primary_uuid)
- return (primary_uuid, known_master_list)
+ _answer = PStruct('answer_primary',
+ PUUID('primary_uuid'),
+ PList('known_master_list',
+ PStruct('master',
+ PAddress('address'),
+ PUUID('uuid'),
+ ),
+ ),
+ )
class AnnouncePrimary(Packet):
"""
Announce a primary master node election. PM -> SM.
"""
- pass
class ReelectPrimary(Packet):
"""
Force a re-election of a primary master node. M -> M.
"""
- pass
-class AskLastIDs(Packet):
+class LastIDs(Packet):
"""
Ask the last OID, the last TID and the last Partition Table ID that
a storage node stores. Used to recover information. PM -> S, S -> PM.
- """
- pass
-
-class AnswerLastIDs(Packet):
- """
Reply to Ask Last IDs. S -> PM, PM -> S.
"""
- def _encode(self, loid, ltid, lptid):
- # in this case, loid is a valid OID but considered as invalid. This is
- # not an issue because the OID 0 is hard coded and will never be
- # generated
- if loid is None:
- loid = INVALID_OID
- ltid = _encodeTID(ltid)
- lptid = _encodePTID(lptid)
- return loid + ltid + lptid
-
- def _decode(self, body):
- (loid, ltid, lptid) = unpack('!8s8s8s', body)
- if loid == INVALID_OID:
- loid = None
- ltid = _decodeTID(ltid)
- lptid = _decodePTID(lptid)
- return (loid, ltid, lptid)
+ _answer = PStruct('answer_last_ids',
+ POID('last_oid'),
+ PTID('last_tid'),
+ PPTID('last_ptid'),
+ )
-class AskPartitionTable(Packet):
+class PartitionTable(Packet):
"""
Ask the full partition table. PM -> S.
- """
- pass
-
-class AnswerPartitionTable(Packet):
- """
Answer rows in a partition table. S -> PM.
"""
- _header_format = '!8sL'
- _row_entry_format = '!LL'
- _row_entry_len = calcsize(_row_entry_format)
- _cell_entry_format = '!16sH'
- _cell_entry_len = calcsize(_cell_entry_format)
-
- def _encode(self, ptid, row_list):
- ptid = _encodePTID(ptid)
- body = [pack(self._header_format, ptid, len(row_list))]
- row_entry_format = self._row_entry_format
- cell_entry_format = self._cell_entry_format
- for offset, cell_list in row_list:
- body.append(pack(row_entry_format, offset, len(cell_list)))
- for uuid, state in cell_list:
- uuid = _encodeUUID(uuid)
- body.append(pack(cell_entry_format, uuid, state))
- return ''.join(body)
-
- def _decode(self, body):
- index = self._header_len
- (ptid, n) = unpack(self._header_format, body[:index])
- ptid = _decodePTID(ptid)
- row_list = []
- cell_list = []
- row_entry_format = self._row_entry_format
- row_entry_len = self._row_entry_len
- cell_entry_format = self._cell_entry_format
- cell_entry_len = self._cell_entry_len
- for _ in xrange(n):
- next_index = index + row_entry_len
- offset, m = unpack(row_entry_format, body[index:next_index])
- index = next_index
- for _ in xrange(m):
- next_index = index + cell_entry_len
- uuid, state = unpack(cell_entry_format, body[index:next_index])
- index = next_index
- state = CellStates.get(state)
- uuid = _decodeUUID(uuid)
- cell_list.append((uuid, state))
- row_list.append((offset, tuple(cell_list)))
- del cell_list[:]
- return (ptid, row_list)
+ _answer = PStruct('answer_partition_table',
+ PPTID('ptid'),
+ PFRowList,
+ )
-class SendPartitionTable(Packet):
+class NotifyPartitionTable(Packet):
"""
Send rows in a partition table to update other nodes. PM -> S, C.
"""
- _header_format = '!8sL'
- _row_entry_format = '!LL'
- _row_entry_len = calcsize(_row_entry_format)
- _cell_entry_format = '!16sH'
- _cell_entry_len = calcsize(_cell_entry_format)
-
- def _encode(self, ptid, row_list):
- ptid = _encodePTID(ptid)
- body = [pack(self._header_format, ptid, len(row_list))]
- row_entry_format = self._row_entry_format
- cell_entry_format = self._cell_entry_format
- for offset, cell_list in row_list:
- body.append(pack(row_entry_format, offset, len(cell_list)))
- for uuid, state in cell_list:
- uuid = _encodeUUID(uuid)
- body.append(pack(cell_entry_format, uuid, state))
- return ''.join(body)
-
- def _decode(self, body):
- index = self._header_len
- (ptid, n,) = unpack(self._header_format, body[:index])
- ptid = _decodePTID(ptid)
- row_list = []
- cell_list = []
- row_entry_format = self._row_entry_format
- row_entry_len = self._row_entry_len
- cell_entry_format = self._cell_entry_format
- cell_entry_len = self._cell_entry_len
- for _ in xrange(n):
- next_index = index + row_entry_len
- offset, m = unpack(row_entry_format, body[index:next_index])
- index = next_index
- for _ in xrange(m):
- next_index = index + cell_entry_len
- uuid, state = unpack(cell_entry_format, body[index:next_index])
- index = next_index
- state = CellStates.get(state)
- uuid = _decodeUUID(uuid)
- cell_list.append((uuid, state))
- row_list.append((offset, tuple(cell_list)))
- del cell_list[:]
- return (ptid, row_list)
+ _fmt = PStruct('send_partition_table',
+ PPTID('ptid'),
+ PFRowList,
+ )
-class NotifyPartitionChanges(Packet):
+class PartitionChanges(Packet):
"""
Notify a subset of a partition table. This is used to notify changes.
PM -> S, C.
"""
- _header_format = '!8sL'
- _list_entry_format = '!L16sH'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, ptid, cell_list):
- ptid = _encodePTID(ptid)
- body = [pack(self._header_format, ptid, len(cell_list))]
- list_entry_format = self._list_entry_format
- for offset, uuid, state in cell_list:
- uuid = _encodeUUID(uuid)
- body.append(pack(list_entry_format, offset, uuid, state))
- return ''.join(body)
-
- def _decode(self, body):
- packet_offset = self._header_len
- (ptid, n) = unpack(self._header_format, body[:packet_offset])
- ptid = _decodePTID(ptid)
- cell_list = []
- list_entry_format = self._list_entry_format
- list_entry_len = self._list_entry_len
- for _ in xrange(n):
- next_packet_offset = packet_offset + list_entry_len
- (offset, uuid, state) = unpack(list_entry_format,
- body[packet_offset:next_packet_offset])
- packet_offset = next_packet_offset
- state = CellStates.get(state)
- uuid = _decodeUUID(uuid)
- cell_list.append((offset, uuid, state))
- return (ptid, cell_list)
+ _fmt = PStruct('notify_partition_changes',
+ PPTID('ptid'),
+ PList('cell_list',
+ PStruct('cell',
+ PNumber('offset'),
+ PUUID('uuid'),
+ PFNodeState,
+ ),
+ ),
+ )
-class NotifyReplicationDone(Packet):
+class ReplicationDone(Packet):
"""
Notify the master node that a partition has been successully replicated from
a storage to another.
S -> M
"""
- _header_format = '!L'
-
- def _encode(self, offset):
- return pack(self._header_format, offset)
-
- def _decode(self, body):
- (offset, ) = unpack(self._header_format, body)
- return (offset, )
+ _fmt = PStruct('notify_replication_done',
+ PNumber('offset'),
+ )
class StartOperation(Packet):
"""
Tell a storage nodes to start an operation. Until a storage node receives
this message, it must not serve client nodes. PM -> S.
"""
- pass
class StopOperation(Packet):
"""
Tell a storage node to stop an operation. Once a storage node receives
this message, it must not serve client nodes. PM -> S.
"""
- pass
-class AskUnfinishedTransactions(Packet):
+class UnfinishedTransactions(Packet):
"""
Ask unfinished transactions PM -> S.
- """
- pass
-
-class AnswerUnfinishedTransactions(Packet):
- """
Answer unfinished transactions S -> PM.
"""
- _header_format = '!8sL'
- _list_entry_format = '8s'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, max_tid, tid_list):
- body = [pack(self._header_format, max_tid, len(tid_list))]
- body.extend(tid_list)
- return ''.join(body)
-
- def _decode(self, body):
- offset = self._header_len
- (max_tid, n) = unpack(self._header_format, body[:offset])
- tid_list = []
- list_entry_format = self._list_entry_format
- list_entry_len = self._list_entry_len
- for _ in xrange(n):
- next_offset = offset + list_entry_len
- tid = unpack(list_entry_format, body[offset:next_offset])[0]
- offset = next_offset
- tid_list.append(tid)
- return (max_tid, tid_list)
+ _answer = PStruct('answer_unfinished_transactions',
+ PTID('max_tid'),
+ PList('tid_list',
+ PTID('unfinished_tid'),
+ ),
+ )
-class AskObjectPresent(Packet):
+class ObjectPresent(Packet):
"""
Ask if an object is present. If not present, OID_NOT_FOUND should be
returned. PM -> S.
- """
- def _decode(self, body):
- (oid, tid) = unpack('8s8s', body)
- return (oid, _decodeTID(tid))
-
-class AnswerObjectPresent(Packet):
- """
Answer that an object is present. PM -> S.
"""
- def _decode(self, body):
- (oid, tid) = unpack('8s8s', body)
- return (oid, _decodeTID(tid))
+ _fmt = PStruct('object_present',
+ POID('oid'),
+ PTID('tid'),
+ )
+
+ _answer = PStruct('object_present',
+ POID('oid'),
+ PTID('tid'),
+ )
class DeleteTransaction(Packet):
"""
Delete a transaction. PM -> S.
"""
- _header_format = '!8sL'
- _list_entry_format = '8s'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, tid, oid_list):
- body = [pack(self._header_format, tid, len(oid_list))]
- body.extend(oid_list)
- return ''.join(body)
-
- def _decode(self, body):
- offset = self._header_len
- (tid, n) = unpack(self._header_format, body[:offset])
- oid_list = []
- list_entry_format = self._list_entry_format
- list_entry_len = self._list_entry_len
- for _ in xrange(n):
- next_offset = offset + list_entry_len
- oid = unpack(list_entry_format, body[offset:next_offset])[0]
- offset = next_offset
- oid_list.append(oid)
- return (tid, oid_list)
+ _fmt = PStruct('delete_transaction',
+ PTID('tid'),
+ PFOidList,
+ )
class CommitTransaction(Packet):
"""
Commit a transaction. PM -> S.
"""
- def _encode(self, tid):
- return _encodeTID(tid)
-
- def _decode(self, body):
- (tid, ) = unpack('8s', body)
- return (_decodeTID(tid), )
+ _fmt = PStruct('commit_transaction',
+ PTID('tid'),
+ )
-class AskBeginTransaction(Packet):
+class BeginTransaction(Packet):
"""
Ask to begin a new transaction. C -> PM.
- """
- def _encode(self, tid):
- return _encodeTID(tid)
-
- def _decode(self, body):
- return (_decodeTID(unpack('8s', body)[0]), )
-
-class AnswerBeginTransaction(Packet):
- """
Answer when a transaction begin, give a TID if necessary. PM -> C.
"""
- def _encode(self, tid):
- return _encodeTID(tid)
+ _fmt = PStruct('ask_begin_transaction',
+ PTID('tid'),
+ )
- def _decode(self, body):
- (tid, ) = unpack('8s', body)
- return (tid, )
+ _answer = PStruct('answer_begin_transaction',
+ PTID('tid'),
+ )
-class AskFinishTransaction(Packet):
+class FinishTransaction(Packet):
"""
Finish a transaction. C -> PM.
+ Answer when a transaction is finished. PM -> C.
"""
- _header_format = '!8sL'
- _list_entry_format = '8s'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, tid, oid_list):
- body = [pack(self._header_format, tid, len(oid_list))]
- body.extend(oid_list)
- return ''.join(body)
-
- def _decode(self, body):
- offset = self._header_len
- (tid, n) = unpack(self._header_format, body[:offset])
- oid_list = []
- list_entry_format = self._list_entry_format
- list_entry_len = self._list_entry_len
- for _ in xrange(n):
- next_offset = offset + list_entry_len
- oid = unpack(list_entry_format, body[offset:next_offset])[0]
- offset = next_offset
- oid_list.append(oid)
- return (tid, oid_list)
+ _fmt = PStruct('ask_finish_transaction',
+ PTID('tid'),
+ PFOidList,
+ )
+
+ _answer = PStruct('answer_information_locked',
+ PTID('ttid'),
+ PTID('tid'),
+ )
class NotifyTransactionFinished(Packet):
"""
Notify that a transaction blocking a replication is now finished
M -> S
"""
- def _encode(self, ttid, max_tid):
- return _encodeTID(ttid) + _encodeTID(max_tid)
-
- def _decode(self, body):
- (ttid, max_tid) = unpack('8s8s', body)
- return (ttid, max_tid)
-
-class AnswerTransactionFinished(Packet):
- """
- Answer when a transaction is finished. PM -> C.
- """
- def _encode(self, ttid, tid):
- return _encodeTID(ttid) + _encodeTID(tid)
-
- def _decode(self, body):
- (ttid, tid) = unpack('8s8s', body)
- return (_decodeTID(ttid), _decodeTID(tid))
+ _fmt = PStruct('notify_transaction_finished',
+ PTID('ttid'),
+ PTID('max_tid'),
+ )
-class AskLockInformation(Packet):
+class LockInformation(Packet):
"""
Lock information on a transaction. PM -> S.
- """
- # XXX: Identical to InvalidateObjects and AskFinishTransaction
- _header_format = '!8s8sL'
- _list_entry_format = '8s'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, ttid, tid, oid_list):
- body = [pack(self._header_format, ttid, tid, len(oid_list))]
- body.extend(oid_list)
- return ''.join(body)
-
- def _decode(self, body):
- offset = self._header_len
- (ttid, tid, n) = unpack(self._header_format, body[:offset])
- oid_list = []
- list_entry_format = self._list_entry_format
- list_entry_len = self._list_entry_len
- for _ in xrange(n):
- next_offset = offset + list_entry_len
- oid = unpack(list_entry_format, body[offset:next_offset])[0]
- offset = next_offset
- oid_list.append(oid)
- return (ttid, tid, oid_list)
-
-class AnswerInformationLocked(Packet):
- """
Notify information on a transaction locked. S -> PM.
"""
- def _encode(self, tid):
- return _encodeTID(tid)
+ _fmt = PStruct('ask_lock_informations',
+ PTID('ttid'),
+ PTID('tid'),
+ PFOidList,
+ )
- def _decode(self, body):
- (tid, ) = unpack('8s', body)
- return (_decodeTID(tid), )
+ _answer = PStruct('answer_information_locked',
+ PTID('tid'),
+ )
class InvalidateObjects(Packet):
"""
Invalidate objects. PM -> C.
"""
- _header_format = '!8sL'
- _list_entry_format = '8s'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, tid, oid_list):
- body = [pack(self._header_format, tid, len(oid_list))]
- body.extend(oid_list)
- return ''.join(body)
-
- def _decode(self, body):
- offset = self._header_len
- (tid, n) = unpack(self._header_format, body[:offset])
- oid_list = []
- list_entry_format = self._list_entry_format
- list_entry_len = self._list_entry_len
- for _ in xrange(n):
- next_offset = offset + list_entry_len
- oid = unpack(list_entry_format, body[offset:next_offset])[0]
- offset = next_offset
- oid_list.append(oid)
- return (tid, oid_list)
+ _fmt = PStruct('ask_finish_transaction',
+ PTID('tid'),
+ PFOidList,
+ )
-class NotifyUnlockInformation(Packet):
+class UnlockInformation(Packet):
"""
Unlock information on a transaction. PM -> S.
"""
- def _encode(self, tid):
- return _encodeTID(tid)
-
- def _decode(self, body):
- (tid, ) = unpack('8s', body)
- return (_decodeTID(tid), )
+ _fmt = PStruct('notify_unlock_information',
+ PTID('tid'),
+ )
-class AskNewOIDs(Packet):
+class GenerateOIDs(Packet):
"""
Ask new object IDs. C -> PM.
- """
- _header_format = '!H'
-
- def _encode(self, num_oids):
- return pack(self._header_format, num_oids)
-
- def _decode(self, body):
- return unpack(self._header_format, body) # num oids
-
-class AnswerNewOIDs(Packet):
- """
Answer new object IDs. PM -> C.
"""
- _header_format = '!H'
- _list_entry_format = '8s'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, oid_list):
- body = [pack(self._header_format, len(oid_list))]
- body.extend(oid_list)
- return ''.join(body)
-
- def _decode(self, body):
- offset = self._header_len
- (n,) = unpack(self._header_format, body[:offset])
- oid_list = []
- list_entry_format = self._list_entry_format
- list_entry_len = self._list_entry_len
- for _ in xrange(n):
- next_offset = offset + list_entry_len
- oid = unpack(list_entry_format, body[offset:next_offset])[0]
- offset = next_offset
- oid_list.append(oid)
- return (oid_list,)
+ _fmt = PStruct('ask_new_oids',
+ PNumber('num_oids'),
+ )
-class AskStoreObject(Packet):
+ _answer = PStruct('answer_new_oids',
+ PFOidList,
+ )
+
+class StoreObject(Packet):
"""
Ask to store an object. Send an OID, an original serial, a current
transaction ID, and data. C -> S.
- """
- _header_format = '!8s8s8sBL8sB'
-
- @profiler_decorator
- def _encode(self, oid, serial, compression, checksum, data, data_serial,
- tid, unlock):
- if serial is None:
- serial = INVALID_TID
- if data_serial is None:
- data_serial = INVALID_TID
- unlock = unlock and 1 or 0
- return pack(self._header_format, oid, serial, tid, compression,
- checksum, data_serial, unlock) + _encodeString(data)
-
- def _decode(self, body):
- header_len = self._header_len
- r = unpack(self._header_format, body[:header_len])
- oid, serial, tid, compression, checksum, data_serial, unlock = r
- serial = _decodeTID(serial)
- data_serial = _decodeTID(data_serial)
- (data, _) = _decodeString(body, 'data', offset=header_len)
- return (oid, serial, compression, checksum, data, data_serial, tid,
- bool(unlock))
-
-class AnswerStoreObject(Packet):
- """
Answer if an object has been stored. If an object is in conflict,
a serial of the conflicting transaction is returned. In this case,
if this serial is newer than the current transaction ID, a client
node must not try to resolve the conflict. S -> C.
"""
- _header_format = '!B8s8s'
+ _fmt = PStruct('ask_store_object',
+ POID('oid'),
+ PTID('serial'),
+ PBoolean('compression'),
+ PNumber('checksum'),
+ PString('data'),
+ PTID('data_serial'),
+ PTID('tid'),
+ PBoolean('unlock'),
+ )
- def _encode(self, conflicting, oid, serial):
- if serial is None:
- serial = INVALID_TID
- return pack(self._header_format, conflicting, oid, serial)
-
- def _decode(self, body):
- (conflicting, oid, serial) = unpack(self._header_format, body)
- return (conflicting, oid, serial)
+ _answer = PStruct('answer_store_object',
+ PBoolean('conflicting'),
+ POID('oid'),
+ PTID('serial'),
+ )
class AbortTransaction(Packet):
"""
Abort a transaction. C -> S, PM.
"""
- def _decode(self, body):
- (tid, ) = unpack('8s', body)
- return (tid, )
+ _fmt = PStruct('abort_transaction',
+ PTID('tid'),
+ )
-class AskStoreTransaction(Packet):
+class StoreTransaction(Packet):
"""
Ask to store a transaction. C -> S.
- """
- _header_format = '!8sLHHH'
-
- def _encode(self, tid, user, desc, ext, oid_list):
- lengths = (len(oid_list), len(user), len(desc), len(ext))
- body = [pack(self._header_format, tid, *lengths)]
- body.append(user)
- body.append(desc)
- body.append(ext)
- body.extend(oid_list)
- return ''.join(body)
-
- def _decode(self, body):
- r = unpack(self._header_format, body[:self._header_len])
- tid, oid_len, user_len, desc_len, ext_len = r
- body = body[self._header_len:]
- user = body[:user_len]
- body = body[user_len:]
- desc = body[:desc_len]
- body = body[desc_len:]
- ext = body[:ext_len]
- body = body[ext_len:]
- oid_list = []
- for _ in xrange(oid_len):
- (oid, ) = unpack('8s', body[:8])
- body = body[8:]
- oid_list.append(oid)
- return (tid, user, desc, ext, oid_list)
-
-class AnswerStoreTransaction(Packet):
- """
Answer if transaction has been stored. S -> C.
"""
- def _encode(self, tid):
- return _encodeTID(tid)
+ _fmt = PStruct('ask_store_transaction',
+ PTID('tid'),
+ PString('user'),
+ PString('description'),
+ PString('extension'),
+ PFOidList,
+ )
- def _decode(self, body):
- (tid, ) = unpack('8s', body)
- return (tid, )
+ _answer = PStruct('answer_store_transaction',
+ PTID('tid'),
+ )
-class AskObject(Packet):
+class GetObject(Packet):
"""
Ask a stored object by its OID and a serial or a TID if given. If a serial
is specified, the specified revision of an object will be returned. If
a TID is specified, an object right before the TID will be returned. S,C -> S.
- """
- _header_format = '!8s8s8s'
-
- def _encode(self, oid, serial, tid):
- tid = _encodeTID(tid)
- serial = _encodeTID(serial) # serial is the previous TID
- return pack(self._header_format, oid, serial, tid)
-
- def _decode(self, body):
- (oid, serial, tid) = unpack(self._header_format, body)
- if serial == INVALID_TID:
- serial = None
- tid = _decodeTID(tid)
- return (oid, serial, tid)
-
-class AnswerObject(Packet):
- """
Answer the requested object. S -> C.
"""
- _header_format = '!8s8s8s8sBL'
+ _fmt = PStruct('ask_object',
+ POID('oid'),
+ PTID('serial'),
+ PTID('tid'),
+ )
- def _encode(self, oid, serial_start, serial_end, compression,
- checksum, data, data_serial):
- if serial_start is None:
- serial_start = INVALID_TID
- if serial_end is None:
- serial_end = INVALID_TID
- if data_serial is None:
- data_serial = INVALID_TID
- return pack(self._header_format, oid, serial_start, serial_end,
- data_serial, compression, checksum) + _encodeString(data)
-
- def _decode(self, body):
- header_len = self._header_len
- r = unpack(self._header_format, body[:header_len])
- oid, serial_start, serial_end, data_serial, compression, checksum = r
- if serial_end == INVALID_TID:
- serial_end = None
- if data_serial == INVALID_TID:
- data_serial = None
- (data, _) = _decodeString(body, 'data', offset=header_len)
- return (oid, serial_start, serial_end, compression, checksum, data,
- data_serial)
+ _answer = PStruct('answer_object',
+ POID('oid'),
+ PTID('serial_start'),
+ PTID('serial_end'),
+ PBoolean('compression'),
+ PNumber('checksum'),
+ PString('data'),
+ PTID('data_serial'),
+ )
-class AskTIDs(Packet):
+class TIDList(Packet):
"""
Ask for TIDs between a range of offsets. The order of TIDs is descending,
and the range is [first, last). C -> S.
- """
- _header_format = '!QQL'
-
- def _encode(self, first, last, partition):
- return pack(self._header_format, first, last, partition)
-
- def _decode(self, body):
- return unpack(self._header_format, body) # first, last, partition
-
-class AnswerTIDs(Packet):
- """
Answer the requested TIDs. S -> C.
"""
- _header_format = '!L'
- _list_entry_format = '8s'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, tid_list):
- body = [pack(self._header_format, len(tid_list))]
- body.extend(tid_list)
- return ''.join(body)
-
- def _decode(self, body):
- offset = self._header_len
- (n, ) = unpack(self._header_format, body[:offset])
- tid_list = []
- list_entry_format = self._list_entry_format
- list_entry_len = self._list_entry_len
- for _ in xrange(n):
- next_offset = offset + list_entry_len
- tid = unpack(list_entry_format, body[offset:next_offset])[0]
- offset = next_offset
- tid_list.append(tid)
- return (tid_list,)
+ _fmt = PStruct('ask_tids',
+ PIndex('first'),
+ PIndex('last'),
+ PNumber('partition'),
+ )
+
+ _answer = PStruct('answer_tids',
+ PFTidList,
+ )
-class AskTIDsFrom(Packet):
+class TIDListFrom(Packet):
"""
Ask for length TIDs starting at min_tid. The order of TIDs is ascending.
S -> S.
- """
- _header_format = '!8s8sLL'
- _list_entry_format = 'L'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, min_tid, max_tid, length, partition_list):
- body = [pack(self._header_format, min_tid, max_tid, length,
- len(partition_list))]
- list_entry_format = self._list_entry_format
- for partition in partition_list:
- body.append(pack(list_entry_format, partition))
- return ''.join(body)
-
- def _decode(self, body):
- body = StringIO(body)
- read = body.read
- header = unpack(self._header_format, read(self._header_len))
- min_tid, max_tid, length, list_length = header
- list_entry_format = self._list_entry_format
- list_entry_len = self._list_entry_len
- partition_list = []
- for _ in xrange(list_length):
- partition = unpack(list_entry_format, read(list_entry_len))[0]
- partition_list.append(partition)
- return (min_tid, max_tid, length, partition_list)
-
-class AnswerTIDsFrom(AnswerTIDs):
- """
Answer the requested TIDs. S -> S
"""
- pass
+ _fmt = PStruct('tid_list_from',
+ PTID('min_tid'),
+ PTID('max_tid'),
+ PNumber('length'),
+ PList('partition_list',
+ PNumber('partition'),
+ ),
+ )
-class AskTransactionInformation(Packet):
- """
- Ask information about a transaction. Any -> S.
- """
- def _decode(self, body):
- (tid, ) = unpack('8s', body)
- return (tid, )
+ _answer = PStruct('answer_tids',
+ PFTidList,
+ )
-class AnswerTransactionInformation(Packet):
+class TransactionInformation(Packet):
"""
+ Ask information about a transaction. Any -> S.
Answer information (user, description) about a transaction. S -> Any.
"""
- _header_format = '!8sHHHBL'
+ _fmt = PStruct('ask_transaction_information',
+ PTID('tid'),
+ )
- def _encode(self, tid, user, desc, ext, packed, oid_list):
- packed = packed and 1 or 0
- body = [pack(self._header_format, tid, len(user), len(desc), len(ext),
- packed, len(oid_list))]
- body.append(user)
- body.append(desc)
- body.append(ext)
- body.extend(oid_list)
- return ''.join(body)
-
- def _decode(self, body):
- r = unpack(self._header_format, body[:self._header_len])
- tid, user_len, desc_len, ext_len, packed, oid_len = r
- packed = bool(packed)
- body = body[self._header_len:]
- user = body[:user_len]
- body = body[user_len:]
- desc = body[:desc_len]
- body = body[desc_len:]
- ext = body[:ext_len]
- body = body[ext_len:]
- oid_list = []
- for _ in xrange(oid_len):
- (oid, ) = unpack('8s', body[:8])
- body = body[8:]
- oid_list.append(oid)
- return (tid, user, desc, ext, packed, oid_list)
+ _answer = PStruct('answer_transaction_information',
+ PTID('tid'),
+ PString('user'),
+ PString('description'),
+ PString('extension'),
+ PBoolean('packed'),
+ PFOidList,
+ )
-class AskObjectHistory(Packet):
+class ObjectHistory(Packet):
"""
Ask history information for a given object. The order of serials is
descending, and the range is [first, last]. C -> S.
- """
- _header_format = '!8sQQ'
-
- def _encode(self, oid, first, last):
- return pack(self._header_format, oid, first, last)
-
- def _decode(self, body):
- (oid, first, last) = unpack(self._header_format, body)
- return (oid, first, last)
-
-class AnswerObjectHistory(Packet):
- """
Answer history information (serial, size) for an object. S -> C.
"""
- _header_format = '!8sL'
- _list_entry_format = '!8sL'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, oid, history_list):
- body = [pack(self._header_format, oid, len(history_list))]
- list_entry_format = self._list_entry_format
- for serial, size in history_list:
- body.append(pack(list_entry_format, serial, size))
- return ''.join(body)
-
- def _decode(self, body):
- offset = self._header_len
- (oid, length) = unpack(self._header_format, body[:offset])
- history_list = []
- list_entry_format = self._list_entry_format
- list_entry_len = self._list_entry_len
- for _ in xrange(length):
- next_offset = offset + list_entry_len
- serial, size = unpack(list_entry_format, body[offset:next_offset])
- offset = next_offset
- history_list.append((serial, size))
- return (oid, history_list)
+ _fmt = PStruct('ask_object_history',
+ POID('oid'),
+ PIndex('first'),
+ PIndex('last'),
+ )
+
+ _answer = PStruct('answer_object_history',
+ POID('oid'),
+ PFHistoryList,
+ )
-class AskObjectHistoryFrom(Packet):
+class ObjectHistoryFrom(Packet):
"""
Ask history information for a given object. The order of serials is
ascending, and starts at (or above) min_serial for min_oid. S -> S.
- """
- _header_format = '!8s8s8sLL'
-
- def _encode(self, min_oid, min_serial, max_serial, length, partition):
- return pack(self._header_format, min_oid, min_serial, max_serial,
- length, partition)
-
- def _decode(self, body):
- # min_oid, min_serial, length, partition
- return unpack(self._header_format, body)
-
-class AnswerObjectHistoryFrom(Packet):
- """
Answer the requested serials. S -> S.
"""
- _header_format = '!L'
- _list_entry_format = '!8sL'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, object_dict):
- body = [pack(self._header_format, len(object_dict))]
- append = body.append
- extend = body.extend
- list_entry_format = self._list_entry_format
- for oid, serial_list in object_dict.iteritems():
- append(pack(list_entry_format, oid, len(serial_list)))
- extend(serial_list)
- return ''.join(body)
-
- def _decode(self, body):
- body = StringIO(body)
- read = body.read
- list_entry_format = self._list_entry_format
- list_entry_len = self._list_entry_len
- object_dict = {}
- dict_len = unpack(self._header_format, read(self._header_len))[0]
- for _ in xrange(dict_len):
- oid, serial_len = unpack(list_entry_format, read(list_entry_len))
- object_dict[oid] = [read(TID_LEN) for _ in xrange(serial_len)]
- return (object_dict, )
+ _fmt = PStruct('ask_object_history',
+ POID('min_oid'),
+ PTID('min_serial'),
+ PTID('max_serial'),
+ PNumber('length'),
+ PNumber('partition'),
+ )
+
+ _answer = PStruct('ask_finish_transaction',
+ PDict('object_dict',
+ POID('oid'),
+ PFTidList,
+ ),
+ )
-class AskPartitionList(Packet):
+class PartitionList(Packet):
"""
All the following messages are for neoctl to admin node
Ask information about partition
- """
- _header_format = '!LL16s'
-
- def _encode(self, min_offset, max_offset, uuid):
- uuid = _encodeUUID(uuid)
- body = [pack(self._header_format, min_offset, max_offset, uuid)]
- return ''.join(body)
-
- def _decode(self, body):
- (min_offset, max_offset, uuid) = unpack(self._header_format, body)
- uuid = _decodeUUID(uuid)
- return (min_offset, max_offset, uuid)
-
-class AnswerPartitionList(Packet):
- """
Answer information about partition
"""
- _header_format = '!8sL'
- _row_entry_format = '!LL'
- _row_entry_len = calcsize(_row_entry_format)
- _cell_entry_format = '!16sH'
- _cell_entry_len = calcsize(_cell_entry_format)
-
- def _encode(self, ptid, row_list):
- ptid = _encodePTID(ptid)
- body = [pack(self._header_format, ptid, len(row_list))]
- row_entry_format = self._row_entry_format
- cell_entry_format = self._cell_entry_format
- for offset, cell_list in row_list:
- body.append(pack(row_entry_format, offset, len(cell_list)))
- for uuid, state in cell_list:
- uuid = _encodeUUID(uuid)
- body.append(pack(cell_entry_format, uuid, state))
- return ''.join(body)
-
- def _decode(self, body):
- index = self._header_len
- (ptid, n) = unpack(self._header_format, body[:index])
- ptid = _decodePTID(ptid)
- row_list = []
- cell_list = []
- row_entry_format = self._row_entry_format
- row_entry_len = self._row_entry_len
- cell_entry_format = self._cell_entry_format
- cell_entry_len = self._cell_entry_len
- for _ in xrange(n):
- next_index = index + row_entry_len
- offset, m = unpack(row_entry_format, body[index:next_index])
- index = next_index
- for _ in xrange(m):
- next_index = index + cell_entry_len
- uuid, state = unpack(cell_entry_format, body[index:next_index])
- index = next_index
- state = CellStates.get(state)
- uuid = _decodeUUID(uuid)
- cell_list.append((uuid, state))
- row_list.append((offset, tuple(cell_list)))
- del cell_list[:]
- return (ptid, row_list)
-
-class AskNodeList(Packet):
- """
- Ask information about nodes
- """
- _header_format = '!H'
-
- def _encode(self, node_type):
- return ''.join([pack(self._header_format, node_type)])
+ _fmt = PStruct('ask_partition_list',
+ PNumber('min_offset'),
+ PNumber('max_offset'),
+ PUUID('uuid'),
+ )
- def _decode(self, body):
- (node_type, ) = unpack(self._header_format, body)
- node_type = _decodeNodeType(node_type)
- return (node_type,)
+ _answer = PStruct('answer_partition_list',
+ PPTID('ptid'),
+ PFRowList,
+ )
-class AnswerNodeList(Packet):
+class NodeList(Packet):
"""
+ Ask information about nodes
Answer information about nodes
"""
- _header_format = '!L'
- _list_entry_format = '!H6s16sH'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, node_list):
- body = [pack(self._header_format, len(node_list))]
- list_entry_format = self._list_entry_format
- for node_type, address, uuid, state in node_list:
- uuid = _encodeUUID(uuid)
- address = _encodeAddress(address)
- body.append(pack(list_entry_format, node_type, address, uuid,
- state))
- return ''.join(body)
-
- def _decode(self, body):
- offset = self._header_len
- (n,) = unpack(self._header_format, body[:offset])
- node_list = []
- list_entry_format = self._list_entry_format
- list_entry_len = self._list_entry_len
- for _ in xrange(n):
- next_offset = offset + list_entry_len
- r = unpack(list_entry_format, body[offset:next_offset])
- offset = next_offset
- node_type, address, uuid, state = r
- address = _decodeAddress(address)
- node_type = _decodeNodeType(node_type)
- state = _decodeNodeState(state)
- uuid = _decodeUUID(uuid)
- node_list.append((node_type, address, uuid, state))
- return (node_list,)
+ _fmt = PStruct('ask_node_list',
+ PFNodeType,
+ )
+
+ _answer = PStruct('answer_node_list',
+ PFNodeList,
+ )
class SetNodeState(Packet):
"""
Set the node state
"""
- _header_format = '!16sHB'
+ _fmt = PStruct('set_node_state',
+ PUUID('uuid'),
+ PFNodeState,
+ PBoolean('modify_partition_table'),
+ )
- def _encode(self, uuid, state, modify_partition_table):
- uuid = _encodeUUID(uuid)
- return ''.join([pack(self._header_format, uuid, state,
- modify_partition_table)])
-
- def _decode(self, body):
- (uuid, state, modify) = unpack(self._header_format, body)
- state = _decodeNodeState(state)
- uuid = _decodeUUID(uuid)
- return (uuid, state, modify)
+ _answer = Error
class AddPendingNodes(Packet):
"""
Ask the primary to include some pending node in the partition table
"""
- _header_format = '!H'
- _list_header_format = '!16s'
- _list_header_len = calcsize(_list_header_format)
-
- def _encode(self, uuid_list=()):
- list_header_format = self._list_header_format
- # an empty list means all current pending nodes
- uuid_list = [pack(list_header_format, _encodeUUID(uuid)) \
- for uuid in uuid_list]
- return pack(self._header_format, len(uuid_list)) + ''.join(uuid_list)
-
- def _decode(self, body):
- header_len = self._header_len
- (n, ) = unpack(self._header_format, body[:header_len])
- list_header_format = self._list_header_format
- list_header_len = self._list_header_len
- uuid_list = [unpack(list_header_format,
- body[header_len+i*list_header_len:\
- header_len+(i+1)*list_header_len])[0] for i in xrange(n)]
- uuid_list = [_decodeUUID(x) for x in uuid_list]
- return (uuid_list, )
+ _fmt = PStruct('add_pending_nodes',
+ PFUUIDList,
+ )
+
+ _answer = Error
class NotifyNodeInformation(Packet):
"""
Notify information about one or more nodes. PM -> Any.
"""
- _header_format = '!L'
- _list_entry_format = '!H6s16sH'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, node_list):
- body = [pack(self._header_format, len(node_list))]
- list_entry_format = self._list_entry_format
- for node_type, address, uuid, state in node_list:
- uuid = _encodeUUID(uuid)
- address = _encodeAddress(address)
- body.append(pack(list_entry_format, node_type, address, uuid,
- state))
- return ''.join(body)
-
- def _decode(self, body):
- offset = self._header_len
- (n,) = unpack(self._header_format, body[:offset])
- node_list = []
- list_entry_format = self._list_entry_format
- list_entry_len = self._list_entry_len
- for _ in xrange(n):
- next_offset = offset + list_entry_len
- r = unpack(list_entry_format, body[offset:next_offset])
- offset = next_offset
- node_type, address, uuid, state = r
- address = _decodeAddress(address)
- node_type = _decodeNodeType(node_type)
- state = _decodeNodeState(state)
- uuid = _decodeUUID(uuid)
- node_list.append((node_type, address, uuid, state))
- return (node_list,)
+ _fmt = PStruct('notify_node_informations',
+ PFNodeList,
+ )
-class AskNodeInformation(Packet):
+class NodeInformation(Packet):
"""
Ask node information
"""
- pass
-
-class AnswerNodeInformation(Packet):
- """
- Answer node information
- """
- pass
+ _answer = PFEmpty
class SetClusterState(Packet):
"""
Set the cluster state
"""
- _header_format = '!H'
-
- def _encode(self, state):
- return pack(self._header_format, state)
+ _fmt = PStruct('set_cluster_state',
+ PEnum('state', ClusterStates),
+ )
- def _decode(self, body):
- (state, ) = unpack(self._header_format, body[:self._header_len])
- state = _decodeClusterState(state)
- return (state, )
+ _answer = Error
-class NotifyClusterInformation(Packet):
+class ClusterInformation(Packet):
"""
Notify information about the cluster
"""
- _header_format = '!H'
-
- def _encode(self, state):
- return pack(self._header_format, state)
-
- def _decode(self, body):
- (state, ) = unpack(self._header_format, body)
- state = _decodeClusterState(state)
- return (state, )
+ _fmt = PStruct('notify_cluster_information',
+ PEnum('state', ClusterStates),
+ )
-class AskClusterState(Packet):
+class ClusterState(Packet):
"""
Ask state of the cluster
- """
- pass
-
-class AnswerClusterState(Packet):
- """
Answer state of the cluster
"""
- _header_format = '!H'
- def _encode(self, state):
- return pack(self._header_format, state)
-
- def _decode(self, body):
- (state, ) = unpack(self._header_format, body)
- state = _decodeClusterState(state)
- return (state, )
+ _answer = PStruct('answer_cluster_state',
+ PEnum('state', ClusterStates),
+ )
class NotifyLastOID(Packet):
"""
Notify last OID generated
"""
- def _decode(self, body):
- (loid, ) = unpack('8s', body)
- return (loid, )
+ _fmt = PStruct('notify_last_oid',
+ POID('last_oid'),
+ )
-class AskObjectUndoSerial(Packet):
+class ObjectUndoSerial(Packet):
"""
Ask storage the serial where object data is when undoing given transaction,
for a list of OIDs.
C -> S
- """
- _header_format = '!8s8s8sL'
-
- def _encode(self, tid, ltid, undone_tid, oid_list):
- body = StringIO()
- write = body.write
- write(pack(self._header_format, tid, ltid, undone_tid, len(oid_list)))
- for oid in oid_list:
- write(oid)
- return body.getvalue()
-
- def _decode(self, body):
- body = StringIO(body)
- read = body.read
- tid, ltid, undone_tid, oid_list_len = unpack(self._header_format,
- read(self._header_len))
- oid_list = [read(8) for _ in xrange(oid_list_len)]
- return tid, ltid, undone_tid, oid_list
-
-class AnswerObjectUndoSerial(Packet):
- """
Answer serials at which object data is when undoing a given transaction.
object_tid_dict has the following format:
key: oid
@@ -1569,207 +1145,154 @@ class AnswerObjectUndoSerial(Packet):
If current_serial's data is current on storage.
S -> C
"""
- _header_format = '!L'
- _list_entry_format = '!8s8s8sB'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, object_tid_dict):
- body = StringIO()
- write = body.write
- write(pack(self._header_format, len(object_tid_dict)))
- list_entry_format = self._list_entry_format
- for oid, (current_serial, undo_serial, is_current) in \
- object_tid_dict.iteritems():
- if undo_serial is None:
- undo_serial = ZERO_TID
- write(pack(list_entry_format, oid, current_serial, undo_serial,
- is_current))
- return body.getvalue()
-
- def _decode(self, body):
- body = StringIO(body)
- read = body.read
- object_tid_dict = {}
- list_entry_format = self._list_entry_format
- list_entry_len = self._list_entry_len
- object_tid_len = unpack(self._header_format, read(self._header_len))[0]
- for _ in xrange(object_tid_len):
- oid, current_serial, undo_serial, is_current = unpack(
- list_entry_format, read(list_entry_len))
- if undo_serial == ZERO_TID:
- undo_serial = None
- object_tid_dict[oid] = (current_serial, undo_serial,
- bool(is_current))
- return (object_tid_dict, )
+ _fmt = PStruct('ask_undo_transaction',
+ PTID('tid'),
+ PTID('ltid'),
+ PTID('undone_tid'),
+ PFOidList,
+ )
+
+ _answer = PStruct('answer_undo_transaction',
+ PDict('object_tid_dict',
+ POID('oid'),
+ PStruct('object_tid_value',
+ PTID('current_serial'),
+ PTID('undo_serial'),
+ PBoolean('is_current'),
+ ),
+ ),
+ )
-class AskHasLock(Packet):
+class HasLock(Packet):
"""
Ask a storage is oid is locked by another transaction.
C -> S
- """
- def _encode(self, tid, oid):
- return _encodeTID(tid) + _encodeTID(oid)
-
- def _decode(self, body):
- return (_decodeTID(body[:8]), _decodeTID(body[8:]))
-
-class AnswerHasLock(Packet):
- """
Answer whether a transaction holds the write lock for requested object.
"""
- _header_format = '!8sH'
-
- def _encode(self, oid, state):
- return pack(self._header_format, oid, state)
+ _fmt = PStruct('has_load_lock',
+ PTID('tid'),
+ POID('oid'),
+ )
- def _decode(self, body):
- oid, state = unpack(self._header_format, body)
- return (oid, _decodeLockState(state))
+ _answer = PStruct('answer_has_lock',
+ POID('oid'),
+ PEnum('lock_state', LockState),
+ )
-class AskCheckCurrentSerial(Packet):
+class CheckCurrentSerial(Packet):
"""
Verifies if given serial is current for object oid in the database, and
take a write lock on it (so that this state is not altered until
transaction ends).
- """
- _header_format = '!8s8s8s'
-
- def _encode(self, tid, serial, oid):
- return tid + serial + oid
-
- def _decode(self, body):
- return unpack(self._header_format, body)
-
-class AnswerCheckCurrentSerial(AnswerStoreObject):
- """
Answer to AskCheckCurrentSerial.
Same structure as AnswerStoreObject, to handle the same way, except there
is nothing to invalidate in any client's cache.
"""
- pass
+ _fmt = PStruct('ask_check_current_serial',
+ PTID('tid'),
+ PTID('serial'),
+ POID('oid'),
+ )
+
+ _answer = PStruct('answer_store_object',
+ PBoolean('conflicting'),
+ POID('oid'),
+ PTID('serial'),
+ )
-class AskBarrier(Packet):
+class Barrier(Packet):
"""
Initates a "network barrier", allowing the node sending this packet to know
when all packets sent previously on the same connection have been handled
by its peer.
"""
- pass
-class AnswerBarrier(Packet):
- pass
+ _answer = PFEmpty
-class AskPack(Packet):
+class Pack(Packet):
"""
Request a pack at given TID.
C -> M
M -> S
- """
- def _encode(self, tid):
- return _encodeTID(tid)
-
- def _decode(self, body):
- return (_decodeTID(body), )
-
-class AnswerPack(Packet):
- """
Inform that packing it over.
S -> M
M -> C
"""
- _header_format = '!H'
-
- def _encode(self, status):
- return pack(self._header_format, int(status))
+ _fmt = PStruct('ask_pack',
+ PTID('tid'),
+ )
- def _decode(self, body):
- return (bool(unpack(self._header_format, body)[0]), )
+ _answer = PStruct('answer_pack',
+ PBoolean('status'),
+ )
-class AskCheckTIDRange(Packet):
+class CheckTIDRange(Packet):
"""
Ask some stats about a range of transactions.
Used to know if there are differences between a replicating node and
reference node.
S -> S
- """
- _header_format = '!8s8sLL'
-
- def _encode(self, min_tid, max_tid, length, partition):
- return pack(self._header_format, min_tid, max_tid, length, partition)
-
- def _decode(self, body):
- # min_tid, max_tid, length, partition
- return unpack(self._header_format, body)
-
-class AnswerCheckTIDRange(Packet):
- """
Stats about a range of transactions.
Used to know if there are differences between a replicating node and
reference node.
S -> S
"""
- _header_format = '!8sLLQ8s'
- def _encode(self, min_tid, length, count, tid_checksum, max_tid):
- return pack(self._header_format, min_tid, length, count, tid_checksum,
- max_tid)
-
- def _decode(self, body):
- # min_tid, length, partition, count, tid_checksum, max_tid
- return unpack(self._header_format, body)
+ _fmt = PStruct('ask_check_tid_range',
+ PTID('min_tid'),
+ PTID('max_tid'),
+ PNumber('length'),
+ PNumber('partition'),
+ )
+
+ _answer = PStruct('answer_check_tid_range',
+ PTID('min_tid'),
+ PNumber('length'),
+ PNumber('count'),
+ PChecksum('checksum'),
+ PTID('max_tid'),
+ )
-class AskCheckSerialRange(Packet):
+class CheckSerialRange(Packet):
"""
Ask some stats about a range of object history.
Used to know if there are differences between a replicating node and
reference node.
S -> S
- """
- _header_format = '!8s8s8sLL'
-
- def _encode(self, min_oid, min_serial, max_tid, length, partition):
- return pack(self._header_format, min_oid, min_serial, max_tid, length,
- partition)
-
- def _decode(self, body):
- # min_oid, min_serial, max_tid, length, partition
- return unpack(self._header_format, body)
-
-class AnswerCheckSerialRange(Packet):
- """
Stats about a range of object history.
Used to know if there are differences between a replicating node and
reference node.
S -> S
"""
- _header_format = '!8s8sLLQ8sQ8s'
+ _fmt = PStruct('ask_check_serial_range',
+ POID('min_oid'),
+ PTID('min_serial'),
+ PTID('max_tid'),
+ PNumber('length'),
+ PNumber('partition'),
+ )
- def _encode(self, min_oid, min_serial, length, count, oid_checksum,
- max_oid, serial_checksum, max_serial):
- return pack(self._header_format, min_oid, min_serial, length, count,
- oid_checksum, max_oid, serial_checksum, max_serial)
-
- def _decode(self, body):
- # min_oid, min_serial, length, count, oid_checksum, max_oid,
- # serial_checksum, max_serial
- return unpack(self._header_format, body)
+ _answer = PStruct('answer_check_serial_range',
+ POID('min_oid'),
+ PTID('min_serial'),
+ PNumber('length'),
+ PNumber('count'),
+ PChecksum('oid_checksum'),
+ POID('max_oid'),
+ PChecksum('serial_checksum'),
+ PTID('max_serial'),
+ )
-class AskLastTransaction(Packet):
+class LastTransaction(Packet):
"""
Ask last committed TID.
C -> M
- """
- pass
-
-class AnswerLastTransaction(Packet):
- """
Answer last committed TID.
M -> C
"""
- def _encode(self, tid):
- return tid
- def _decode(self, body):
- return (body, )
+ _answer = PStruct('answer_last_transaction',
+ PTID('tid'),
+ )
class NotifyReady(Packet):
"""
@@ -1778,38 +1301,12 @@ class NotifyReady(Packet):
"""
pass
-class Error(Packet):
- """
- Error is a special type of message, because this can be sent against
- any other message, even if such a message does not expect a reply
- usually. Any -> Any.
- """
- _header_format = '!H'
-
- def _encode(self, code, message):
- return pack(self._header_format, code) + _encodeString(message)
-
- def _decode(self, body):
- offset = self._header_len
- (code, ) = unpack(self._header_format, body[:offset])
- code = _decodeErrorCode(code)
- (message, _) = _decodeString(body, 'message', offset=offset)
- return (code, message)
-
-
-def initMessage(klass):
- if klass._header_format is not None:
- klass._header_len = calcsize(klass._header_format)
-
StaticRegistry = {}
def register(code, request, answer=None, ignore_when_closed=None):
""" Register a packet in the packet registry """
# register the request
- # assert code & RESPONSE_MASK == 0
assert code not in StaticRegistry, "Duplicate request packet code"
- initMessage(request)
request._code = code
- request._answer = answer
StaticRegistry[code] = request
if ignore_when_closed is None:
# By default, on a closed connection:
@@ -1818,17 +1315,21 @@ def register(code, request, answer=None,
# - nofitication: keep
ignore_when_closed = answer is not None
request._ignore_when_closed = ignore_when_closed
- if answer not in (None, Error):
- initMessage(answer)
- # compute the answer code
- code = code | RESPONSE_MASK
- answer._request = request
- answer._code = code
- # and register the answer packet
- assert code not in StaticRegistry, "Duplicate response packet code"
- StaticRegistry[code] = answer
- return (request, answer)
- return request
+ if request._answer in (Error, None):
+ return request
+ # build a class for the answer
+ answer = ClassType('Answer%s' % (request.__name__, ), (Packet, ), {})
+ answer._fmt = request._answer
+ # compute the answer code
+ code = code | RESPONSE_MASK
+ answer._request = request
+ assert answer._code is None, "Answer of %s is already used" % (request, )
+ answer._code = code
+ request._answer = answer
+ # and register the answer packet
+ assert code not in StaticRegistry, "Duplicate response packet code"
+ StaticRegistry[code] = answer
+ return (request, answer)
class ParserState(object):
"""
@@ -1850,7 +1351,6 @@ class PacketRegistry(dict):
"""
Packet registry that check packet code unicity and provide an index
"""
-
def __init__(self):
dict.__init__(self)
# load packet classes
@@ -1859,10 +1359,10 @@ class PacketRegistry(dict):
def parse(self, buf, state_container):
state = state_container.get()
if state is None:
- header = buf.read(PACKET_HEADER_SIZE)
+ header = buf.read(PACKET_HEADER_FORMAT.size)
if header is None:
return None
- msg_id, msg_type, msg_len = unpack(PACKET_HEADER_FORMAT, header)
+ msg_id, msg_type, msg_len = PACKET_HEADER_FORMAT.unpack(header)
try:
packet_klass = self[msg_type]
except KeyError:
@@ -1871,7 +1371,7 @@ class PacketRegistry(dict):
raise PacketMalformedError('message too big (%d)' % msg_len)
if msg_len < MIN_PACKET_SIZE:
raise PacketMalformedError('message too small (%d)' % msg_len)
- msg_len -= PACKET_HEADER_SIZE
+ msg_len -= PACKET_HEADER_FORMAT.size
else:
msg_id, packet_klass, msg_len = state
data = buf.read(msg_len)
@@ -1886,180 +1386,113 @@ class PacketRegistry(dict):
packet.setContent(msg_id, data)
return packet
- # packets registration
- Error = register(0x8000, Error)
- Notify = register(0x0032, Notify)
+ # notifications
+ Error = register(
+ 0x8000, Error)
Ping, Pong = register(
- 0x0001,
- Ping,
- Pong)
+ 0x0001, Ping)
+ Notify = register(
+ 0x0002, Notify)
RequestIdentification, AcceptIdentification = register(
- 0x0002,
- RequestIdentification,
- AcceptIdentification)
+ 0x0003, RequestIdentification)
AskPrimary, AnswerPrimary = register(
- 0x0003,
- AskPrimary,
- AnswerPrimary)
- AnnouncePrimary = register(0x0004, AnnouncePrimary)
- ReelectPrimary = register(0x0005, ReelectPrimary)
- NotifyNodeInformation = register(0x0006, NotifyNodeInformation)
+ 0x0004, PrimaryMaster)
+ AnnouncePrimary = register(
+ 0x0005, AnnouncePrimary)
+ ReelectPrimary = register(
+ 0x0006, ReelectPrimary)
+ NotifyNodeInformation = register(
+ 0x0007, NotifyNodeInformation)
AskLastIDs, AnswerLastIDs = register(
- 0x0007,
- AskLastIDs,
- AnswerLastIDs)
+ 0x0008, LastIDs)
AskPartitionTable, AnswerPartitionTable = register(
- 0x0008,
- AskPartitionTable,
- AnswerPartitionTable)
- SendPartitionTable = register(0x0009, SendPartitionTable)
- NotifyPartitionChanges = register(0x000A, NotifyPartitionChanges)
- StartOperation = register(0x000B, StartOperation)
- StopOperation = register(0x000C, StopOperation)
+ 0x0009, PartitionTable)
+ SendPartitionTable = register(
+ 0x000A, NotifyPartitionTable)
+ NotifyPartitionChanges = register(
+ 0x000B, PartitionChanges)
+ StartOperation = register(
+ 0x000C, StartOperation)
+ StopOperation = register(
+ 0x000D, StopOperation)
AskUnfinishedTransactions, AnswerUnfinishedTransactions = register(
- 0x000D,
- AskUnfinishedTransactions,
- AnswerUnfinishedTransactions)
+ 0x000E, UnfinishedTransactions)
AskObjectPresent, AnswerObjectPresent = register(
- 0x000f,
- AskObjectPresent,
- AnswerObjectPresent)
- DeleteTransaction = register(0x0010, DeleteTransaction)
- CommitTransaction = register(0x0011, CommitTransaction)
+ 0x000F, ObjectPresent)
+ DeleteTransaction = register(
+ 0x0010, DeleteTransaction)
+ CommitTransaction = register(
+ 0x0011, CommitTransaction)
AskBeginTransaction, AnswerBeginTransaction = register(
- 0x0012,
- AskBeginTransaction,
- AnswerBeginTransaction)
+ 0x0012, BeginTransaction)
AskFinishTransaction, AnswerTransactionFinished = register(
- 0x0013,
- AskFinishTransaction,
- AnswerTransactionFinished,
- ignore_when_closed=False,
- )
+ 0x0013, FinishTransaction, ignore_when_closed=False)
AskLockInformation, AnswerInformationLocked = register(
- 0x0014,
- AskLockInformation,
- AnswerInformationLocked,
- )
- InvalidateObjects = register(0x0015, InvalidateObjects)
- NotifyUnlockInformation = register(0x0016, NotifyUnlockInformation)
+ 0x0014, LockInformation, ignore_when_closed=False)
+ InvalidateObjects = register(
+ 0x0015, InvalidateObjects)
+ NotifyUnlockInformation = register(
+ 0x0016, UnlockInformation)
AskNewOIDs, AnswerNewOIDs = register(
- 0x0017,
- AskNewOIDs,
- AnswerNewOIDs)
+ 0x0017, GenerateOIDs)
AskStoreObject, AnswerStoreObject = register(
- 0x0018,
- AskStoreObject,
- AnswerStoreObject)
- AbortTransaction = register(0x0019, AbortTransaction)
+ 0x0018, StoreObject)
+ AbortTransaction = register(
+ 0x0019, AbortTransaction)
AskStoreTransaction, AnswerStoreTransaction = register(
- 0x001A,
- AskStoreTransaction,
- AnswerStoreTransaction)
+ 0x001A, StoreTransaction)
AskObject, AnswerObject = register(
- 0x001B,
- AskObject,
- AnswerObject)
+ 0x001B, GetObject)
AskTIDs, AnswerTIDs = register(
- 0x001C,
- AskTIDs,
- AnswerTIDs)
+ 0x001C, TIDList)
AskTransactionInformation, AnswerTransactionInformation = register(
- 0x001E,
- AskTransactionInformation,
- AnswerTransactionInformation)
+ 0x001D, TransactionInformation)
AskObjectHistory, AnswerObjectHistory = register(
- 0x001F,
- AskObjectHistory,
- AnswerObjectHistory)
+ 0x001E, ObjectHistory)
AskPartitionList, AnswerPartitionList = register(
- 0x0021,
- AskPartitionList,
- AnswerPartitionList)
+ 0x001F, PartitionList)
AskNodeList, AnswerNodeList = register(
- 0x0022,
- AskNodeList,
- AnswerNodeList)
+ 0x0020, NodeList)
SetNodeState = register(
- 0x0023,
- SetNodeState,
- Error,
- ignore_when_closed=False,
- )
+ 0x0021, SetNodeState, ignore_when_closed=False)
AddPendingNodes = register(
- 0x0024,
- AddPendingNodes,
- Error,
- ignore_when_closed=False,
- )
+ 0x0022, AddPendingNodes, ignore_when_closed=False)
AskNodeInformation, AnswerNodeInformation = register(
- 0x0025,
- AskNodeInformation,
- AnswerNodeInformation)
+ 0x0023, NodeInformation)
SetClusterState = register(
- 0x0026,
- SetClusterState,
- Error,
- ignore_when_closed=False,
- )
- NotifyClusterInformation = register(0x0027, NotifyClusterInformation)
+ 0x0024, SetClusterState, ignore_when_closed=False)
+ NotifyClusterInformation = register(
+ 0x0025, ClusterInformation)
AskClusterState, AnswerClusterState = register(
- 0x0028,
- AskClusterState,
- AnswerClusterState)
- NotifyLastOID = register(0x0030, NotifyLastOID)
- NotifyReplicationDone = register(0x0031, NotifyReplicationDone)
+ 0x0026, ClusterState)
+ NotifyLastOID = register(
+ 0x0027, NotifyLastOID)
+ NotifyReplicationDone = register(
+ 0x0028, ReplicationDone)
AskObjectUndoSerial, AnswerObjectUndoSerial = register(
- 0x0033,
- AskObjectUndoSerial,
- AnswerObjectUndoSerial)
+ 0x0029, ObjectUndoSerial)
AskHasLock, AnswerHasLock = register(
- 0x0034,
- AskHasLock,
- AnswerHasLock)
+ 0x002A, HasLock)
AskTIDsFrom, AnswerTIDsFrom = register(
- 0x0035,
- AskTIDsFrom,
- AnswerTIDsFrom)
+ 0x002B, TIDListFrom)
AskObjectHistoryFrom, AnswerObjectHistoryFrom = register(
- 0x0036,
- AskObjectHistoryFrom,
- AnswerObjectHistoryFrom)
+ 0x002C, ObjectHistoryFrom)
AskBarrier, AnswerBarrier = register(
- 0x0037,
- AskBarrier,
- AnswerBarrier)
+ 0x002D, Barrier)
AskPack, AnswerPack = register(
- 0x0038,
- AskPack,
- AnswerPack,
- ignore_when_closed=False,
- )
+ 0x002E, Pack, ignore_when_closed=False)
AskCheckTIDRange, AnswerCheckTIDRange = register(
- 0x0039,
- AskCheckTIDRange,
- AnswerCheckTIDRange,
- )
+ 0x002F, CheckTIDRange)
AskCheckSerialRange, AnswerCheckSerialRange = register(
- 0x003A,
- AskCheckSerialRange,
- AnswerCheckSerialRange,
- )
- NotifyReady = register(0x003B, NotifyReady)
+ 0x0030, CheckSerialRange)
+ NotifyReady = register(
+ 0x0031, NotifyReady)
AskLastTransaction, AnswerLastTransaction = register(
- 0x003C,
- AskLastTransaction,
- AnswerLastTransaction,
- )
+ 0x0032, LastTransaction)
AskCheckCurrentSerial, AnswerCheckCurrentSerial = register(
- 0x003D,
- AskCheckCurrentSerial,
- AnswerCheckCurrentSerial,
- )
+ 0x0033, CheckCurrentSerial)
NotifyTransactionFinished = register(
- 0x003E,
- NotifyTransactionFinished,
- )
+ 0x003E, NotifyTransactionFinished)
# build a "singleton"
Packets = PacketRegistry()
@@ -2073,7 +1506,6 @@ class ErrorRegistry(dict):
"""
Error packet packet registry
"""
-
def __init__(self):
dict.__init__(self)
Modified: trunk/neo/lib/util.py
==============================================================================
--- trunk/neo/lib/util.py [iso-8859-1] (original)
+++ trunk/neo/lib/util.py [iso-8859-1] Tue Feb 8 16:47:10 2011
@@ -22,6 +22,23 @@ from zlib import adler32
from Queue import deque
from struct import pack, unpack
+try:
+ from struct import Struct
+except ImportError:
+ import struct
+ # support for python 2.4
+ class Struct(object):
+
+ def __init__(self, fmt):
+ self._fmt = fmt
+ self.size = struct.calcsize(fmt)
+
+ def pack(self, *args):
+ return struct.pack(self._fmt, *args)
+
+ def unpack(self, *args):
+ return struct.unpack(self._fmt, *args)
+
def u64(s):
return unpack('!Q', s)[0]
Modified: trunk/neo/tests/testProtocol.py
==============================================================================
--- trunk/neo/tests/testProtocol.py [iso-8859-1] (original)
+++ trunk/neo/tests/testProtocol.py [iso-8859-1] Tue Feb 8 16:47:10 2011
@@ -70,9 +70,9 @@ class ProtocolTests(NeoUnitTestBase):
def test_11_RequestIdentification(self):
uuid = self.getNewUUID()
- p = Packets.RequestIdentification(NodeTypes.CLIENT, uuid,
- ("127.0.0.1", 9080), "unittest")
- node, p_uuid, (ip, port), name = p.decode()
+ p = Packets.RequestIdentification(NodeTypes.CLIENT,
+ uuid, ("127.0.0.1", 9080), "unittest")
+ (plow, phigh), node, p_uuid, (ip, port), name = p.decode()
self.assertEqual(node, NodeTypes.CLIENT)
self.assertEqual(p_uuid, uuid)
self.assertEqual(ip, "127.0.0.1")
@@ -148,9 +148,11 @@ class ProtocolTests(NeoUnitTestBase):
uuid1 = self.getNewUUID()
uuid2 = self.getNewUUID()
uuid3 = self.getNewUUID()
- cell_list = [(0, ((uuid1, CellStates.UP_TO_DATE), (uuid2, CellStates.OUT_OF_DATE))),
- (43, ((uuid2, CellStates.OUT_OF_DATE),(uuid3, CellStates.DISCARDED))),
- (124, ((uuid1, CellStates.DISCARDED), (uuid3, CellStates.UP_TO_DATE)))]
+ cell_list = [
+ (0, [(uuid1, CellStates.UP_TO_DATE), (uuid2, CellStates.OUT_OF_DATE)]),
+ (43, [(uuid2, CellStates.OUT_OF_DATE), (uuid3, CellStates.DISCARDED)]),
+ (124, [(uuid1, CellStates.DISCARDED), (uuid3, CellStates.UP_TO_DATE)]),
+ ]
p = Packets.AnswerPartitionTable(ptid, cell_list)
pptid, p_cell_list = p.decode()
self.assertEqual(pptid, ptid)
@@ -161,9 +163,11 @@ class ProtocolTests(NeoUnitTestBase):
uuid1 = self.getNewUUID()
uuid2 = self.getNewUUID()
uuid3 = self.getNewUUID()
- cell_list = [(0, ((uuid1, CellStates.UP_TO_DATE), (uuid2, CellStates.OUT_OF_DATE))),
- (43, ((uuid2, CellStates.OUT_OF_DATE),(uuid3, CellStates.DISCARDED))),
- (124, ((uuid1, CellStates.DISCARDED), (uuid3, CellStates.UP_TO_DATE)))]
+ cell_list = [
+ (0, [(uuid1, CellStates.UP_TO_DATE), (uuid2, CellStates.OUT_OF_DATE)]),
+ (43, [(uuid2, CellStates.OUT_OF_DATE), (uuid3, CellStates.DISCARDED)]),
+ (124, [(uuid1, CellStates.DISCARDED), (uuid3, CellStates.UP_TO_DATE)]),
+ ]
p = Packets.AnswerPartitionTable(ptid, cell_list)
pptid, p_cell_list = p.decode()
self.assertEqual(pptid, ptid)
@@ -176,8 +180,7 @@ class ProtocolTests(NeoUnitTestBase):
cell_list = [(0, uuid1, CellStates.UP_TO_DATE),
(43, uuid2, CellStates.OUT_OF_DATE),
(124, uuid1, CellStates.DISCARDED)]
- p = Packets.NotifyPartitionChanges(ptid,
- cell_list)
+ p = Packets.NotifyPartitionChanges(ptid, cell_list)
pptid, p_cell_list = p.decode()
self.assertEqual(pptid, ptid)
self.assertEqual(p_cell_list, cell_list)
@@ -235,7 +238,6 @@ class ProtocolTests(NeoUnitTestBase):
ptid = p.decode()[0]
self.assertEqual(ptid, tid)
-
def test_32_askBeginTransaction(self):
tid = self.getNextTID()
p = Packets.AskBeginTransaction(tid)
@@ -370,11 +372,11 @@ class ProtocolTests(NeoUnitTestBase):
def test_46_answerStoreObject(self):
oid = self.getNextTID()
serial = self.getNextTID()
- p = Packets.AnswerStoreObject(1, oid, serial)
+ p = Packets.AnswerStoreObject(True, oid, serial)
conflicting, poid, pserial = p.decode()
self.assertEqual(oid, poid)
self.assertEqual(serial, pserial)
- self.assertEqual(conflicting, 1)
+ self.assertTrue(conflicting)
def test_47_askObject(self):
oid = self.getNextTID()
@@ -532,7 +534,7 @@ class ProtocolTests(NeoUnitTestBase):
def test_AddPendingNodes(self):
uuid1, uuid2 = self.getNewUUID(), self.getNewUUID()
- p = Packets.AddPendingNodes([uuid1, uuid2])
+ p = Packets.AddPendingNodes((uuid1, uuid2))
self.assertEqual(p.decode(), ([uuid1, uuid2], ))
def test_SetNodeState(self):
@@ -551,7 +553,7 @@ class ProtocolTests(NeoUnitTestBase):
self.getNewUUID(), NodeStates.DOWN)
node2 = (NodeTypes.MASTER, ('127.0.0.1', 2000),
self.getNewUUID(), NodeStates.RUNNING)
- p = Packets.AnswerNodeList([node1, node2])
+ p = Packets.AnswerNodeList((node1, node2))
self.assertEqual(p.decode(), ([node1, node2], ))
def test_AskPartitionList(self):
@@ -564,14 +566,14 @@ class ProtocolTests(NeoUnitTestBase):
def test_AnswerPartitionList(self):
ptid = self.getPTID(1)
row_list = [
- (0, (
+ (0, [
(self.getNewUUID(), CellStates.UP_TO_DATE),
(self.getNewUUID(), CellStates.OUT_OF_DATE),
- )),
- (1, (
+ ]),
+ (1, [
(self.getNewUUID(), CellStates.FEEDING),
(self.getNewUUID(), CellStates.DISCARDED),
- )),
+ ]),
]
p = Packets.AnswerPartitionList(ptid, row_list)
self.assertEqual(p.decode(), (ptid, row_list))
More information about the Neo-report
mailing list