[Neo-report] r2652 gregory - in /trunk/neo: client/handlers/ lib/ tests/

nobody at svn.erp5.org nobody at svn.erp5.org
Tue Feb 8 16:47:10 CET 2011


Author: gregory
Date: Tue Feb  8 16:47:10 2011
New Revision: 2652

Log:
New protocol parser, semantic oriented.

Modified:
    trunk/neo/client/handlers/master.py
    trunk/neo/lib/handler.py
    trunk/neo/lib/logger.py
    trunk/neo/lib/protocol.py
    trunk/neo/lib/util.py
    trunk/neo/tests/testProtocol.py

Modified: trunk/neo/client/handlers/master.py
==============================================================================
--- trunk/neo/client/handlers/master.py [iso-8859-1] (original)
+++ trunk/neo/client/handlers/master.py [iso-8859-1] Tue Feb  8 16:47:10 2011
@@ -160,7 +160,7 @@ class PrimaryAnswersHandler(AnswerBaseHa
         self.app.setHandlerData(ttid)
 
     def answerNewOIDs(self, conn, oid_list):
-        self.app.new_oid_list = oid_list
+        self.app.new_oid_list = list(oid_list)
 
     def answerTransactionFinished(self, conn, _, tid):
         self.app.setHandlerData(tid)

Modified: trunk/neo/lib/handler.py
==============================================================================
--- trunk/neo/lib/handler.py [iso-8859-1] (original)
+++ trunk/neo/lib/handler.py [iso-8859-1] Tue Feb  8 16:47:10 2011
@@ -133,10 +133,13 @@ class EventHandler(object):
     def notify(self, conn, message):
         neo.lib.logging.info('notification from %r: %s', conn, message)
 
-    def requestIdentification(self, conn, node_type,
-                                        uuid, address, name):
+    def requestIdentification(self, conn, node_type, uuid, address, name):
         raise UnexpectedPacketError
 
+    def _requestIdentification(self, conn, protocol, node_type,
+            uuid, address, name):
+        self.requestIdentification(conn, node_type, uuid, address, name)
+
     def acceptIdentification(self, conn, node_type,
                        uuid, num_partitions, num_replicas, your_uuid):
         raise UnexpectedPacketError
@@ -428,7 +431,7 @@ class EventHandler(object):
 
         d[Packets.Error] = self.error
         d[Packets.Notify] = self.notify
-        d[Packets.RequestIdentification] = self.requestIdentification
+        d[Packets.RequestIdentification] = self._requestIdentification
         d[Packets.AcceptIdentification] = self.acceptIdentification
         d[Packets.AskPrimary] = self.askPrimary
         d[Packets.AnswerPrimary] = self.answerPrimary

Modified: trunk/neo/lib/logger.py
==============================================================================
--- trunk/neo/lib/logger.py [iso-8859-1] (original)
+++ trunk/neo/lib/logger.py [iso-8859-1] Tue Feb  8 16:47:10 2011
@@ -37,8 +37,11 @@ class PacketLogger(object):
         klass = packet.getType()
         uuid = dump(conn.getUUID())
         ip, port = conn.getAddress()
+        packet_name = packet.__class__.__name__
+        if packet.isResponse() and packet._request is not None:
+            packet_name += packet._request.__name__
         neo.lib.logging.debug('#0x%08x %-30s %s %s (%s:%d)', packet.getId(),
-                packet.__class__.__name__, direction, uuid, ip, port)
+                packet_name, direction, uuid, ip, port)
         # look for custom packet logger
         logger = self.packet_dispatch_table.get(klass, None)
         logger = logger and getattr(self, logger.im_func.__name__, None)

Modified: trunk/neo/lib/protocol.py
==============================================================================
--- trunk/neo/lib/protocol.py [iso-8859-1] (original)
+++ trunk/neo/lib/protocol.py [iso-8859-1] Tue Feb  8 16:47:10 2011
@@ -15,12 +15,13 @@
 # along with this program; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 
-from struct import pack, unpack, error, calcsize
+import sys
+import traceback
+from types import ClassType
 from socket import inet_ntoa, inet_aton
-from neo.lib.profiling import profiler_decorator
 from cStringIO import StringIO
 
-from neo.lib.util import Enum
+from neo.lib.util import Enum, Struct
 
 # The protocol version (major, minor).
 PROTOCOL_VERSION = (4, 1)
@@ -28,13 +29,13 @@ PROTOCOL_VERSION = (4, 1)
 # Size restrictions.
 MIN_PACKET_SIZE = 10
 MAX_PACKET_SIZE = 0x4000000
-PACKET_HEADER_FORMAT = '!LHL'
-PACKET_HEADER_SIZE = calcsize(PACKET_HEADER_FORMAT)
+PACKET_HEADER_FORMAT = Struct('!LHL')
 # Check that header size is the expected value.
 # If it is not, it means that struct module result is incompatible with
 # "reference" platform (python 2.4 on x86-64).
-assert PACKET_HEADER_SIZE == 10, \
-    'Unsupported platform, packet header length = %i' % (PACKET_HEADER_SIZE, )
+assert PACKET_HEADER_FORMAT.size == 10, \
+    'Unsupported platform, packet header length = %i' % \
+    (PACKET_HEADER_FORMAT.size, )
 RESPONSE_MASK = 0x8000
 
 class ErrorCodes(Enum):
@@ -109,6 +110,7 @@ INVALID_UUID = '\0' * 16
 INVALID_TID = '\xff' * 8
 INVALID_OID = '\xff' * 8
 INVALID_PARTITION = 0xffffffff
+INVALID_ADDRESS = ('0.0.0.0', 0)
 ZERO_TID = '\0' * 8
 ZERO_OID = '\0' * 8
 OID_LEN = len(INVALID_OID)
@@ -141,128 +143,42 @@ class BrokenNodeDisallowedError(Protocol
     """ Just close the connection """
     pass
 
-
-# packet parser
-def _decodeClusterState(state):
-    cluster_state = ClusterStates.get(state)
-    if cluster_state is None:
-        raise PacketMalformedError('invalid cluster state %d' % state)
-    return cluster_state
-
-def _decodeNodeState(state):
-    node_state = NodeStates.get(state)
-    if node_state is None:
-        raise PacketMalformedError('invalid node state %d' % state)
-    return node_state
-
-def _decodeNodeType(original_node_type):
-    node_type = NodeTypes.get(original_node_type)
-    if node_type is None:
-        raise PacketMalformedError('invalid node type %d' % original_node_type)
-    return node_type
-
-def _decodeErrorCode(original_error_code):
-    error_code = ErrorCodes.get(original_error_code)
-    if error_code is None:
-        raise PacketMalformedError('invalid error code %d' %
-                original_error_code)
-    return error_code
-
-def _decodeLockState(original_lock_state):
-    lock_state = LockState.get(original_lock_state)
-    if lock_state is None:
-        raise PacketMalformedError('invalid lock state %d' % (
-            original_lock_state, ))
-    return lock_state
-
-def _decodeAddress(address):
-    if address == '\0' * 6:
-        return None
-    (ip, port) = unpack('!4sH', address)
-    return (inet_ntoa(ip), port)
-
-def _encodeAddress(address):
-    if address is None:
-        return '\0' * 6
-    # address is a tuple (ip, port)
-    return pack('!4sH', inet_aton(address[0]), address[1])
-
-def _decodeUUID(uuid):
-    if uuid == INVALID_UUID:
-        return None
-    return uuid
-
-def _encodeUUID(uuid):
-    if uuid is None:
-        return INVALID_UUID
-    return uuid
-
-def _decodePTID(ptid):
-    ptid = unpack('!Q', ptid)[0]
-    if ptid == 0:
-        return None
-    return ptid
-
-def _encodePTID(ptid):
-    if ptid is None:
-        ptid = 0
-    assert isinstance(ptid, (int, long)), ptid
-    return pack('!Q', ptid)
-
-def _decodeTID(tid):
-    if tid == INVALID_TID:
-        return None
-    return tid
-
-def _encodeTID(tid):
-    if tid is None:
-        return INVALID_TID
-    return tid
-
-def _decodeString(buf, name, offset=0):
-    buf = buf[offset:]
-    (size, ) = unpack('!L', buf[:4])
-    string = buf[4:4+size]
-    if len(string) != size:
-        raise PacketMalformedError("can't read string <%s>" % name)
-    return (string, buf[offset+4+size:])
-
- at profiler_decorator
-def _encodeString(buf):
-    return pack('!L', len(buf)) + buf
-
 class Packet(object):
     """
-    Base class for any packet definition.
-    Each subclass should override _encode() and _decode() and return a string or
-    a tuple respectively.
+        Base class for any packet definition. The _fmt class attribute must be
+        defined for any non-empty packet.
     """
-
     _ignore_when_closed = False
-    _header_format = None
-    _header_len = None
     _request = None
     _answer = None
     _body = None
     _code = None
+    _fmt = None
     _id = None
 
     def __init__(self, *args, **kw):
+        args = list(args)
         assert self._code is not None, "Packet class not registered"
-        if args != () or kw != {}:
-            body = self._encode(*args, **kw)
+        if args or kw:
+            assert self._fmt is not None
+            buf = StringIO()
+            # load named arguments
+            for item in self._fmt._items[len(args):]:
+                args.append(kw.get(item._name))
+            self._fmt.encode(buf.write, args)
+            body = buf.getvalue()
         else:
             body = ''
         self._body = body
 
     def decode(self):
         assert self._body is not None
+        if self._fmt is None:
+            return ()
+        buf = StringIO(self._body)
         try:
-            return self._decode(self._body)
-        except error, msg: # struct.error
-            name = self.__class__.__name__
-            raise PacketMalformedError("%s fail (%s)" % (name, msg))
-        except PacketMalformedError, msg:
+            return self._fmt.decode(buf.read)
+        except ParseError, msg:
             name = self.__class__.__name__
             raise PacketMalformedError("%s fail (%s)" % (name, msg))
 
@@ -278,23 +194,17 @@ class Packet(object):
         assert self._id is not None, "No identifier applied on the packet"
         return self._id
 
-    def getCode(self):
-        return self._code
-
     def getType(self):
         return self.__class__
 
-    @profiler_decorator
     def encode(self):
         """ Encode a packet as a string to send it over the network """
         content = self._body
-        length = PACKET_HEADER_SIZE + len(content)
-        return (pack(PACKET_HEADER_FORMAT, self._id, self._code, length),
-            content)
+        length = PACKET_HEADER_FORMAT.size + len(content)
+        return (PACKET_HEADER_FORMAT.pack(self._id, self._code, length), content)
 
-    @profiler_decorator
     def __len__(self):
-        return PACKET_HEADER_SIZE + len(self._body)
+        return PACKET_HEADER_FORMAT.size + len(self._body)
 
     def __repr__(self):
         return '%s[%r]' % (self.__class__.__name__, self._id)
@@ -306,17 +216,6 @@ class Packet(object):
         assert isinstance(other, Packet)
         return self._code == other._code
 
-    def _encode(self, *args, **kw):
-        """ Default encoder, join all arguments """
-        args = list(args)
-        args.extend(kw.values())
-        return ''.join([str(i) for i in args] or '')
-
-    def _decode(self, body):
-        """ Default decoder, message must be empty """
-        assert body == '', "Non-empty packet decoding not implemented """
-        return ()
-
     def isError(self):
         return isinstance(self, Error)
 
@@ -333,1230 +232,907 @@ class Packet(object):
         """
         return self._ignore_when_closed
 
-class Notify(Packet):
+class ParseError(Exception):
     """
-        General purpose notification (remote logging)
+        An exception that encapsulate another and build the 'path' of the
+        packet item that generate the error.
     """
-    def _encode(self, message):
-        return message
+    def __init__(self, item, trace):
+        Exception.__init__(self)
+        self._trace = trace
+        self._items = [item]
 
-    def _decode(self, body):
-        return (body, )
+    def append(self, item):
+        self._items.append(item)
 
-class Ping(Packet):
+    def __repr__(self):
+        chain = '/'.join([item.getName() for item in reversed(self._items)])
+        return 'at %s:\n%s' % (chain, self._trace)
+
+    __str__ = __repr__
+
+# packet parsers
+
+class PItem(object):
     """
-    Check if a peer is still alive. Any -> Any.
+        Base class for any packet item, _encode and _decode must be overriden
+        by subclasses.
     """
-    pass
+    def __init__(self, name):
+        self._name = name
+
+    def __repr__(self):
+        return self.__class__.__name__
 
-class Pong(Packet):
+    def getName(self):
+        return self._name
+
+    def _trace(self, method, *args):
+        try:
+            return method(*args)
+        except ParseError, e:
+            # trace and forward exception
+            e.append(self)
+            raise
+        except Exception:
+            # original exception, encapsulate it
+            trace = ''.join(traceback.format_exception(*sys.exc_info())[2:])
+            raise ParseError(self, trace)
+
+    def encode(self, writer, items):
+        return self._trace(self._encode, writer, items)
+
+    def decode(self, reader):
+        return self._trace(self._decode, reader)
+
+    def _encode(self, writer):
+        raise NotImplementedError, self.__class__.__name__
+
+    def _decode(self, reader):
+        raise NotImplementedError, self.__class__.__name__
+
+class PStruct(PItem):
+    """
+        Aggregate other items
+    """
+    def __init__(self, name, *items):
+        PItem.__init__(self, name)
+        self._items = items
+
+    def _encode(self, writer, items):
+        assert len(self._items) == len(items), (items, self._items)
+        for item, value in zip(self._items, items):
+            item.encode(writer, value)
+
+    def _decode(self, reader):
+        return tuple([item.decode(reader) for item in self._items])
+
+class PStructItem(PItem):
+    """
+        A single value encoded with struct
+    """
+    def __init__(self, name, fmt):
+        PItem.__init__(self, name)
+        struct = Struct(fmt)
+        self.pack = struct.pack
+        self.unpack = struct.unpack
+        self.size = struct.size
+
+    def _encode(self, writer, value):
+        writer(self.pack(value))
+
+    def _decode(self, reader):
+        return self.unpack(reader(self.size))[0]
+
+class PList(PStructItem):
+    """
+        A list of homogeneous items
+    """
+    def __init__(self, name, item):
+        PStructItem.__init__(self, name, '!L')
+        self._item = item
+
+    def _encode(self, writer, items):
+        assert isinstance(items, (list, tuple, set)), (type(items), items)
+        writer(self.pack(len(items)))
+        item = self._item
+        for value in items:
+            item.encode(writer, value)
+
+    def _decode(self, reader):
+        length = self.unpack(reader(self.size))[0]
+        item = self._item
+        return [item.decode(reader) for _ in xrange(length)]
+
+class PDict(PStructItem):
+    """
+        A dictionary with custom key and value formats
+    """
+    def __init__(self, name, key, value):
+        PStructItem.__init__(self, name, '!L')
+        self._key = key
+        self._value = value
+
+    def _encode(self, writer, item):
+        assert isinstance(item , dict), (type(item), item)
+        writer(self.pack(len(item)))
+        key, value = self._key, self._value
+        for k, v in item.iteritems():
+            key.encode(writer, k)
+            value.encode(writer, v)
+
+    def _decode(self, reader):
+        length = self.unpack(reader(self.size))[0]
+        key, value = self._key, self._value
+        new_dict = {}
+        for _ in xrange(length):
+            k = key.decode(reader)
+            v = value.decode(reader)
+            new_dict[k] = v
+        return new_dict
+
+class PEnum(PStructItem):
     """
-    Notify being alive. Any -> Any.
+        Encapsulate an enumeration value
     """
-    pass
+    def __init__(self, name, enum):
+        PStructItem.__init__(self, name, '!L')
+        self._enum = enum
 
-class RequestIdentification(Packet):
+    def _encode(self, writer, item):
+        assert isinstance(item, int), item
+        writer(self.pack(item))
+
+    def _decode(self, reader):
+        code = self.unpack(reader(self.size))[0]
+        try:
+            return self._enum[code]
+        except KeyError:
+            enum = self._enum.__class__.__name__
+            raise ValueError, 'Invalid code for %s enum: %r' % (enum, code)
+
+class PAddress(PStructItem):
+    """
+        An host address (IPv4 for now)
+    """
+    def __init__(self, name):
+        PStructItem.__init__(self, name, '!4sH')
+
+    def _encode(self, writer, address):
+        if address is None:
+            address = INVALID_ADDRESS
+        assert len(address) == 2, address
+        host, port = address
+        host = inet_aton(host)
+        writer(self.pack(host, port))
+
+    def _decode(self, reader):
+        data = reader(self.size)
+        host, port = self.unpack(data)
+        host = inet_ntoa(host)
+        if (host, port) == INVALID_ADDRESS:
+            return None
+        return (host, port)
+
+class PString(PStructItem):
     """
-    Request a node identification. This must be the first packet for any
-    connection. Any -> Any.
+        A variable-length string
+    """
+    def __init__(self, name):
+        PStructItem.__init__(self, name, '!L')
+
+    def _encode(self, writer, value):
+        writer(self.pack(len(value)))
+        writer(value)
+
+    def _decode(self, reader):
+        length = self.unpack(reader(self.size))[0]
+        return reader(length)
+
+class PBoolean(PStructItem):
+    """
+        A boolean value, encoded as a single byte
+    """
+    def __init__(self, name):
+        PStructItem.__init__(self, name, '!B')
+
+    def _encode(self, writer, value):
+        writer(self.pack(bool(value)))
+
+    def _decode(self, reader):
+        return bool(self.unpack(reader(self.size))[0])
+
+class PNumber(PStructItem):
+    """
+        A integer number (4-bytes length)
+    """
+    def __init__(self, name):
+        PStructItem.__init__(self, name, '!L')
+
+class PChecksum(PStructItem):
+    """
+        A checksum
+    """
+    def __init__(self, name):
+        PStructItem.__init__(self, name, '!Q')
+
+class PIndex(PStructItem):
+    """
+        A big integer to defined indexes in a huge list.
+    """
+    def __init__(self, name):
+        PStructItem.__init__(self, name, '!Q')
+
+class PPTID(PStructItem):
+    """
+        A None value means an invalid PTID
     """
-    _header_format = '!LLH16s6s'
+    def __init__(self, name):
+        PStructItem.__init__(self, name, '!Q')
 
-    def _encode(self, node_type, uuid, address, name):
-        uuid = _encodeUUID(uuid)
-        address = _encodeAddress(address)
-        return pack(self._header_format, PROTOCOL_VERSION[0],
-                          PROTOCOL_VERSION[1], node_type, uuid, address) + \
-                          _encodeString(name)
-
-    def _decode(self, body):
-        r = unpack(self._header_format, body[:self._header_len])
-        major, minor, node_type, uuid, address = r
-        address = _decodeAddress(address)
-        (name, _) = _decodeString(body, 'name', offset=self._header_len)
-        node_type = _decodeNodeType(node_type)
-        uuid = _decodeUUID(uuid)
+    def _encode(self, writer, value):
+        if value is None:
+            value = 0
+        PStructItem._encode(self, writer, value)
+
+    def _decode(self, reader):
+        value = PStructItem._decode(self, reader)
+        if value == 0:
+            value = None
+        return value
+
+class PProtocol(PStructItem):
+    """
+        The protocol version definition
+    """
+    def __init__(self, name):
+        PStructItem.__init__(self, name, '!LL')
+
+    def _encode(self, writer, version):
+        writer(self.pack(*version))
+
+    def _decode(self, reader):
+        major, minor = self.unpack(reader(self.size))
         if (major, minor) != PROTOCOL_VERSION:
-            raise PacketMalformedError('protocol version mismatch')
-        return (node_type, uuid, address, name)
+            raise ProtocolError('protocol version mismatch')
+        return (major, minor)
 
-class AcceptIdentification(Packet):
+class PUUID(PItem):
     """
-    Accept a node identification. This should be a reply to Request Node
-    Identification. Any -> Any.
+        An UUID (node identifier)
     """
-    _header_format = '!H16sLL16s'
-
-    def _encode(self, node_type, uuid,
-             num_partitions, num_replicas, your_uuid):
-        uuid = _encodeUUID(uuid)
-        your_uuid = _encodeUUID(your_uuid)
-        return pack(self._header_format, node_type, uuid,
-                          num_partitions, num_replicas, your_uuid)
-
-    def _decode(self, body):
-        r = unpack(self._header_format, body)
-        node_type, uuid, num_partitions, num_replicas, your_uuid = r
-        node_type = _decodeNodeType(node_type)
-        uuid = _decodeUUID(uuid)
-        your_uuid = _decodeUUID(your_uuid)
-        return (node_type, uuid, num_partitions, num_replicas, your_uuid)
+    def _encode(self, writer, uuid):
+        if uuid is None:
+            uuid = INVALID_UUID
+        assert len(uuid) == 16, (len(uuid), uuid)
+        writer(uuid)
+
+    def _decode(self, reader):
+        uuid = reader(16)
+        if uuid == INVALID_UUID:
+            uuid = None
+        return uuid
+
+class PTID(PItem):
+    """
+        A transaction identifier
+    """
+    def _encode(self, writer, tid):
+        if tid is None:
+            tid = INVALID_TID
+        assert len(tid) == 8, (len(tid), tid)
+        writer(tid)
+
+    def _decode(self, reader):
+        tid = reader(8)
+        if tid == INVALID_TID:
+            tid = None
+        return tid
+
+# same definition, for now
+POID = PTID
+
+# common definitions
+
+PFEmpty = PStruct('no_content')
+PFNodeType = PEnum('type', NodeTypes)
+PFNodeState = PEnum('state', NodeStates)
+PFCellState = PEnum('state', CellStates)
+
+PFNodeList = PList('node_list',
+    PStruct('node',
+        PFNodeType,
+        PAddress('address'),
+        PUUID('uuid'),
+        PFNodeState,
+    ),
+)
+
+PFCellList = PList('cell_list',
+    PStruct('cell',
+        PUUID('uuid'),
+        PFCellState,
+    ),
+)
+
+PFRowList = PList('row_list',
+    PStruct('row',
+        PNumber('offset'),
+        PFCellList,
+    ),
+)
+
+PFHistoryList = PList('history_list',
+    PStruct('history_entry',
+        PTID('serial'),
+        PNumber('size'),
+    ),
+)
+
+PFUUIDList = PList('uuid_list',
+    PUUID('uuid'),
+)
+
+PFTidList = PList('tid_list',
+    PTID('tid'),
+)
+
+PFOidList = PList('oid_list',
+    POID('oid'),
+)
 
-class AskPrimary(Packet):
+# packets definition
+
+class Notify(Packet):
     """
-    Ask a current primary master node. This must be the second message when
-    connecting to a master node. Any -> M.
+        General purpose notification (remote logging)
     """
-    pass
+    _fmt = PStruct('notify',
+        PString('message'),
+    )
+
+class Error(Packet):
+    """
+    Error is a special type of message, because this can be sent against
+    any other message, even if such a message does not expect a reply
+    usually. Any -> Any.
+    """
+    _fmt = PStruct('error',
+        PNumber('code'),
+        PString('message'),
+    )
+
+class Ping(Packet):
+    """
+    Check if a peer is still alive. Any -> Any.
+    """
+    _answer = PFEmpty
 
-class AnswerPrimary(Packet):
+class RequestIdentification(Packet):
+    """
+    Request a node identification. This must be the first packet for any
+    connection. Any -> Any.
     """
+
+    _fmt = PStruct('request_identification',
+        PProtocol('protocol_version'),
+        PFNodeType,
+        PUUID('uuid'),
+        PAddress('address'),
+        PString('name'),
+    )
+
+    _answer = PStruct('accept_identification',
+        PFNodeType,
+        PUUID('my_uuid'),
+        PNumber('num_partitions'),
+        PNumber('num_replicas'),
+        PUUID('your_uuid'),
+    )
+
+    def __init__(self, *args, **kw):
+        if args or kw:
+            # always announce current protocol version
+            args = list(args)
+            args.insert(0, PROTOCOL_VERSION)
+        super(RequestIdentification, self).__init__(*args, **kw)
+
+class PrimaryMaster(Packet):
+    """
+    Ask a current primary master node. This must be the second message when
+    connecting to a master node. Any -> M.
     Reply to Ask Primary Master. This message includes a list of known master
     nodes to make sure that a peer has the same information. M -> Any.
     """
-    _header_format = '!16sL'
-    _list_entry_format = '!6s16s'
-    _list_entry_len = calcsize(_list_entry_format)
-
-    def _encode(self, primary_uuid, known_master_list):
-        primary_uuid = _encodeUUID(primary_uuid)
-        body = [pack(self._header_format, primary_uuid,
-            len(known_master_list))]
-        for address, uuid in known_master_list:
-            uuid = _encodeUUID(uuid)
-            address = _encodeAddress(address)
-            body.append(pack(self._list_entry_format, address, uuid))
-        return ''.join(body)
-
-    def _decode(self, body):
-        packet_offset = self._header_len
-        (primary_uuid, n) = unpack(self._header_format,
-            body[:packet_offset])
-        known_master_list = []
-        list_entry_len = self._list_entry_len
-        list_entry_format = self._list_entry_format
-        for _ in xrange(n):
-            next_packet_offset = packet_offset + list_entry_len
-            address, uuid = unpack(list_entry_format,
-                body[packet_offset:next_packet_offset])
-            packet_offset = next_packet_offset
-            address = _decodeAddress(address)
-            uuid = _decodeUUID(uuid)
-            known_master_list.append((address, uuid))
-        primary_uuid = _decodeUUID(primary_uuid)
-        return (primary_uuid, known_master_list)
+    _answer = PStruct('answer_primary',
+        PUUID('primary_uuid'),
+        PList('known_master_list',
+            PStruct('master',
+                PAddress('address'),
+                PUUID('uuid'),
+            ),
+        ),
+    )
 
 class AnnouncePrimary(Packet):
     """
     Announce a primary master node election. PM -> SM.
     """
-    pass
 
 class ReelectPrimary(Packet):
     """
     Force a re-election of a primary master node. M -> M.
     """
-    pass
 
-class AskLastIDs(Packet):
+class LastIDs(Packet):
     """
     Ask the last OID, the last TID and the last Partition Table ID that
     a storage node stores. Used to recover information. PM -> S, S -> PM.
-    """
-    pass
-
-class AnswerLastIDs(Packet):
-    """
     Reply to Ask Last IDs. S -> PM, PM -> S.
     """
-    def _encode(self, loid, ltid, lptid):
-        # in this case, loid is a valid OID but considered as invalid. This is
-        # not an issue because the OID 0 is hard coded and will never be
-        # generated
-        if loid is None:
-            loid = INVALID_OID
-        ltid = _encodeTID(ltid)
-        lptid = _encodePTID(lptid)
-        return loid + ltid + lptid
-
-    def _decode(self, body):
-        (loid, ltid, lptid) = unpack('!8s8s8s', body)
-        if loid == INVALID_OID:
-            loid = None
-        ltid = _decodeTID(ltid)
-        lptid = _decodePTID(lptid)
-        return (loid, ltid, lptid)
+    _answer = PStruct('answer_last_ids',
+        POID('last_oid'),
+        PTID('last_tid'),
+        PPTID('last_ptid'),
+    )
 
-class AskPartitionTable(Packet):
+class PartitionTable(Packet):
     """
     Ask the full partition table. PM -> S.
-    """
-    pass
-
-class AnswerPartitionTable(Packet):
-    """
     Answer rows in a partition table. S -> PM.
     """
-    _header_format = '!8sL'
-    _row_entry_format = '!LL'
-    _row_entry_len = calcsize(_row_entry_format)
-    _cell_entry_format = '!16sH'
-    _cell_entry_len = calcsize(_cell_entry_format)
-
-    def _encode(self, ptid, row_list):
-        ptid = _encodePTID(ptid)
-        body = [pack(self._header_format, ptid, len(row_list))]
-        row_entry_format = self._row_entry_format
-        cell_entry_format = self._cell_entry_format
-        for offset, cell_list in row_list:
-            body.append(pack(row_entry_format, offset, len(cell_list)))
-            for uuid, state in cell_list:
-                uuid = _encodeUUID(uuid)
-                body.append(pack(cell_entry_format, uuid, state))
-        return ''.join(body)
-
-    def _decode(self, body):
-        index = self._header_len
-        (ptid, n) = unpack(self._header_format, body[:index])
-        ptid = _decodePTID(ptid)
-        row_list = []
-        cell_list = []
-        row_entry_format = self._row_entry_format
-        row_entry_len = self._row_entry_len
-        cell_entry_format = self._cell_entry_format
-        cell_entry_len = self._cell_entry_len
-        for _ in xrange(n):
-            next_index = index + row_entry_len
-            offset, m = unpack(row_entry_format, body[index:next_index])
-            index = next_index
-            for _ in xrange(m):
-                next_index = index + cell_entry_len
-                uuid, state = unpack(cell_entry_format, body[index:next_index])
-                index = next_index
-                state = CellStates.get(state)
-                uuid = _decodeUUID(uuid)
-                cell_list.append((uuid, state))
-            row_list.append((offset, tuple(cell_list)))
-            del cell_list[:]
-        return (ptid, row_list)
+    _answer = PStruct('answer_partition_table',
+        PPTID('ptid'),
+        PFRowList,
+    )
 
-class SendPartitionTable(Packet):
+class NotifyPartitionTable(Packet):
     """
     Send rows in a partition table to update other nodes. PM -> S, C.
     """
-    _header_format = '!8sL'
-    _row_entry_format = '!LL'
-    _row_entry_len = calcsize(_row_entry_format)
-    _cell_entry_format = '!16sH'
-    _cell_entry_len = calcsize(_cell_entry_format)
-
-    def _encode(self, ptid, row_list):
-        ptid = _encodePTID(ptid)
-        body = [pack(self._header_format, ptid, len(row_list))]
-        row_entry_format = self._row_entry_format
-        cell_entry_format = self._cell_entry_format
-        for offset, cell_list in row_list:
-            body.append(pack(row_entry_format, offset, len(cell_list)))
-            for uuid, state in cell_list:
-                uuid = _encodeUUID(uuid)
-                body.append(pack(cell_entry_format, uuid, state))
-        return ''.join(body)
-
-    def _decode(self, body):
-        index = self._header_len
-        (ptid, n,) = unpack(self._header_format, body[:index])
-        ptid = _decodePTID(ptid)
-        row_list = []
-        cell_list = []
-        row_entry_format = self._row_entry_format
-        row_entry_len = self._row_entry_len
-        cell_entry_format = self._cell_entry_format
-        cell_entry_len = self._cell_entry_len
-        for _ in xrange(n):
-            next_index = index + row_entry_len
-            offset, m = unpack(row_entry_format, body[index:next_index])
-            index = next_index
-            for _ in xrange(m):
-                next_index = index + cell_entry_len
-                uuid, state = unpack(cell_entry_format, body[index:next_index])
-                index = next_index
-                state = CellStates.get(state)
-                uuid = _decodeUUID(uuid)
-                cell_list.append((uuid, state))
-            row_list.append((offset, tuple(cell_list)))
-            del cell_list[:]
-        return (ptid, row_list)
+    _fmt = PStruct('send_partition_table',
+        PPTID('ptid'),
+        PFRowList,
+    )
 
-class NotifyPartitionChanges(Packet):
+class PartitionChanges(Packet):
     """
     Notify a subset of a partition table. This is used to notify changes.
     PM -> S, C.
     """
-    _header_format = '!8sL'
-    _list_entry_format = '!L16sH'
-    _list_entry_len = calcsize(_list_entry_format)
-
-    def _encode(self, ptid, cell_list):
-        ptid = _encodePTID(ptid)
-        body = [pack(self._header_format, ptid, len(cell_list))]
-        list_entry_format = self._list_entry_format
-        for offset, uuid, state in cell_list:
-            uuid = _encodeUUID(uuid)
-            body.append(pack(list_entry_format, offset, uuid, state))
-        return ''.join(body)
-
-    def _decode(self, body):
-        packet_offset = self._header_len
-        (ptid, n) = unpack(self._header_format, body[:packet_offset])
-        ptid = _decodePTID(ptid)
-        cell_list = []
-        list_entry_format = self._list_entry_format
-        list_entry_len = self._list_entry_len
-        for _ in xrange(n):
-            next_packet_offset = packet_offset + list_entry_len
-            (offset, uuid, state) = unpack(list_entry_format,
-                body[packet_offset:next_packet_offset])
-            packet_offset = next_packet_offset
-            state = CellStates.get(state)
-            uuid = _decodeUUID(uuid)
-            cell_list.append((offset, uuid, state))
-        return (ptid, cell_list)
+    _fmt = PStruct('notify_partition_changes',
+        PPTID('ptid'),
+        PList('cell_list',
+            PStruct('cell',
+                PNumber('offset'),
+                PUUID('uuid'),
+                PFNodeState,
+            ),
+        ),
+    )
 
-class NotifyReplicationDone(Packet):
+class ReplicationDone(Packet):
     """
     Notify the master node that a partition has been successully replicated from
     a storage to another.
     S -> M
     """
-    _header_format = '!L'
-
-    def _encode(self, offset):
-        return pack(self._header_format, offset)
-
-    def _decode(self, body):
-        (offset, ) = unpack(self._header_format, body)
-        return (offset, )
+    _fmt = PStruct('notify_replication_done',
+        PNumber('offset'),
+    )
 
 class StartOperation(Packet):
     """
     Tell a storage nodes to start an operation. Until a storage node receives
     this message, it must not serve client nodes. PM -> S.
     """
-    pass
 
 class StopOperation(Packet):
     """
     Tell a storage node to stop an operation. Once a storage node receives
     this message, it must not serve client nodes. PM -> S.
     """
-    pass
 
-class AskUnfinishedTransactions(Packet):
+class UnfinishedTransactions(Packet):
     """
     Ask unfinished transactions  PM -> S.
-    """
-    pass
-
-class AnswerUnfinishedTransactions(Packet):
-    """
     Answer unfinished transactions  S -> PM.
     """
-    _header_format = '!8sL'
-    _list_entry_format = '8s'
-    _list_entry_len = calcsize(_list_entry_format)
-
-    def _encode(self, max_tid, tid_list):
-        body = [pack(self._header_format, max_tid, len(tid_list))]
-        body.extend(tid_list)
-        return ''.join(body)
-
-    def _decode(self, body):
-        offset = self._header_len
-        (max_tid, n) = unpack(self._header_format, body[:offset])
-        tid_list = []
-        list_entry_format = self._list_entry_format
-        list_entry_len = self._list_entry_len
-        for _ in xrange(n):
-            next_offset = offset + list_entry_len
-            tid = unpack(list_entry_format, body[offset:next_offset])[0]
-            offset = next_offset
-            tid_list.append(tid)
-        return (max_tid, tid_list)
+    _answer = PStruct('answer_unfinished_transactions',
+        PTID('max_tid'),
+        PList('tid_list',
+            PTID('unfinished_tid'),
+        ),
+    )
 
-class AskObjectPresent(Packet):
+class ObjectPresent(Packet):
     """
     Ask if an object is present. If not present, OID_NOT_FOUND should be
     returned. PM -> S.
-    """
-    def _decode(self, body):
-        (oid, tid) = unpack('8s8s', body)
-        return (oid, _decodeTID(tid))
-
-class AnswerObjectPresent(Packet):
-    """
     Answer that an object is present. PM -> S.
     """
-    def _decode(self, body):
-        (oid, tid) = unpack('8s8s', body)
-        return (oid, _decodeTID(tid))
+    _fmt = PStruct('object_present',
+        POID('oid'),
+        PTID('tid'),
+    )
+
+    _answer = PStruct('object_present',
+        POID('oid'),
+        PTID('tid'),
+    )
 
 class DeleteTransaction(Packet):
     """
     Delete a transaction. PM -> S.
     """
-    _header_format = '!8sL'
-    _list_entry_format = '8s'
-    _list_entry_len = calcsize(_list_entry_format)
-
-    def _encode(self, tid, oid_list):
-        body = [pack(self._header_format, tid, len(oid_list))]
-        body.extend(oid_list)
-        return ''.join(body)
-
-    def _decode(self, body):
-        offset = self._header_len
-        (tid, n) = unpack(self._header_format, body[:offset])
-        oid_list = []
-        list_entry_format = self._list_entry_format
-        list_entry_len = self._list_entry_len
-        for _ in xrange(n):
-            next_offset = offset + list_entry_len
-            oid = unpack(list_entry_format, body[offset:next_offset])[0]
-            offset = next_offset
-            oid_list.append(oid)
-        return (tid, oid_list)
+    _fmt = PStruct('delete_transaction',
+        PTID('tid'),
+        PFOidList,
+    )
 
 class CommitTransaction(Packet):
     """
     Commit a transaction. PM -> S.
     """
-    def _encode(self, tid):
-        return _encodeTID(tid)
-
-    def _decode(self, body):
-        (tid, ) = unpack('8s', body)
-        return (_decodeTID(tid), )
+    _fmt = PStruct('commit_transaction',
+        PTID('tid'),
+    )
 
-class AskBeginTransaction(Packet):
+class BeginTransaction(Packet):
     """
     Ask to begin a new transaction. C -> PM.
-    """
-    def _encode(self, tid):
-        return _encodeTID(tid)
-
-    def _decode(self, body):
-        return (_decodeTID(unpack('8s', body)[0]), )
-
-class AnswerBeginTransaction(Packet):
-    """
     Answer when a transaction begin, give a TID if necessary. PM -> C.
     """
-    def _encode(self, tid):
-        return _encodeTID(tid)
+    _fmt = PStruct('ask_begin_transaction',
+        PTID('tid'),
+    )
 
-    def _decode(self, body):
-        (tid, ) = unpack('8s', body)
-        return (tid, )
+    _answer = PStruct('answer_begin_transaction',
+        PTID('tid'),
+    )
 
-class AskFinishTransaction(Packet):
+class FinishTransaction(Packet):
     """
     Finish a transaction. C -> PM.
+    Answer when a transaction is finished. PM -> C.
     """
-    _header_format = '!8sL'
-    _list_entry_format = '8s'
-    _list_entry_len = calcsize(_list_entry_format)
-
-    def _encode(self, tid, oid_list):
-        body = [pack(self._header_format, tid, len(oid_list))]
-        body.extend(oid_list)
-        return ''.join(body)
-
-    def _decode(self, body):
-        offset = self._header_len
-        (tid, n) = unpack(self._header_format, body[:offset])
-        oid_list = []
-        list_entry_format = self._list_entry_format
-        list_entry_len = self._list_entry_len
-        for _ in xrange(n):
-            next_offset = offset + list_entry_len
-            oid = unpack(list_entry_format, body[offset:next_offset])[0]
-            offset = next_offset
-            oid_list.append(oid)
-        return (tid, oid_list)
+    _fmt = PStruct('ask_finish_transaction',
+        PTID('tid'),
+        PFOidList,
+    )
+
+    _answer = PStruct('answer_information_locked',
+        PTID('ttid'),
+        PTID('tid'),
+    )
 
 class NotifyTransactionFinished(Packet):
     """
     Notify that a transaction blocking a replication is now finished
     M -> S
     """
-    def _encode(self, ttid, max_tid):
-        return _encodeTID(ttid) + _encodeTID(max_tid)
-
-    def _decode(self, body):
-        (ttid, max_tid) = unpack('8s8s', body)
-        return (ttid, max_tid)
-
-class AnswerTransactionFinished(Packet):
-    """
-    Answer when a transaction is finished. PM -> C.
-    """
-    def _encode(self, ttid, tid):
-        return _encodeTID(ttid) + _encodeTID(tid)
-
-    def _decode(self, body):
-        (ttid, tid) = unpack('8s8s', body)
-        return (_decodeTID(ttid), _decodeTID(tid))
+    _fmt = PStruct('notify_transaction_finished',
+        PTID('ttid'),
+        PTID('max_tid'),
+    )
 
-class AskLockInformation(Packet):
+class LockInformation(Packet):
     """
     Lock information on a transaction. PM -> S.
-    """
-    # XXX: Identical to InvalidateObjects and AskFinishTransaction
-    _header_format = '!8s8sL'
-    _list_entry_format = '8s'
-    _list_entry_len = calcsize(_list_entry_format)
-
-    def _encode(self, ttid, tid, oid_list):
-        body = [pack(self._header_format, ttid, tid, len(oid_list))]
-        body.extend(oid_list)
-        return ''.join(body)
-
-    def _decode(self, body):
-        offset = self._header_len
-        (ttid, tid, n) = unpack(self._header_format, body[:offset])
-        oid_list = []
-        list_entry_format = self._list_entry_format
-        list_entry_len = self._list_entry_len
-        for _ in xrange(n):
-            next_offset = offset + list_entry_len
-            oid = unpack(list_entry_format, body[offset:next_offset])[0]
-            offset = next_offset
-            oid_list.append(oid)
-        return (ttid, tid, oid_list)
-
-class AnswerInformationLocked(Packet):
-    """
     Notify information on a transaction locked. S -> PM.
     """
-    def _encode(self, tid):
-        return _encodeTID(tid)
+    _fmt = PStruct('ask_lock_informations',
+        PTID('ttid'),
+        PTID('tid'),
+        PFOidList,
+    )
 
-    def _decode(self, body):
-        (tid, ) = unpack('8s', body)
-        return (_decodeTID(tid), )
+    _answer = PStruct('answer_information_locked',
+        PTID('tid'),
+    )
 
 class InvalidateObjects(Packet):
     """
     Invalidate objects. PM -> C.
     """
-    _header_format = '!8sL'
-    _list_entry_format = '8s'
-    _list_entry_len = calcsize(_list_entry_format)
-
-    def _encode(self, tid, oid_list):
-        body = [pack(self._header_format, tid, len(oid_list))]
-        body.extend(oid_list)
-        return ''.join(body)
-
-    def _decode(self, body):
-        offset = self._header_len
-        (tid, n) = unpack(self._header_format, body[:offset])
-        oid_list = []
-        list_entry_format = self._list_entry_format
-        list_entry_len = self._list_entry_len
-        for _ in xrange(n):
-            next_offset = offset + list_entry_len
-            oid = unpack(list_entry_format, body[offset:next_offset])[0]
-            offset = next_offset
-            oid_list.append(oid)
-        return (tid, oid_list)
+    _fmt = PStruct('ask_finish_transaction',
+        PTID('tid'),
+        PFOidList,
+    )
 
-class NotifyUnlockInformation(Packet):
+class UnlockInformation(Packet):
     """
     Unlock information on a transaction. PM -> S.
     """
-    def _encode(self, tid):
-        return _encodeTID(tid)
-
-    def _decode(self, body):
-        (tid, ) = unpack('8s', body)
-        return (_decodeTID(tid), )
+    _fmt = PStruct('notify_unlock_information',
+        PTID('tid'),
+    )
 
-class AskNewOIDs(Packet):
+class GenerateOIDs(Packet):
     """
     Ask new object IDs. C -> PM.
-    """
-    _header_format = '!H'
-
-    def _encode(self, num_oids):
-        return pack(self._header_format, num_oids)
-
-    def _decode(self, body):
-        return unpack(self._header_format, body) # num oids
-
-class AnswerNewOIDs(Packet):
-    """
     Answer new object IDs. PM -> C.
     """
-    _header_format = '!H'
-    _list_entry_format = '8s'
-    _list_entry_len = calcsize(_list_entry_format)
-
-    def _encode(self, oid_list):
-        body = [pack(self._header_format, len(oid_list))]
-        body.extend(oid_list)
-        return ''.join(body)
-
-    def _decode(self, body):
-        offset = self._header_len
-        (n,) = unpack(self._header_format, body[:offset])
-        oid_list = []
-        list_entry_format = self._list_entry_format
-        list_entry_len = self._list_entry_len
-        for _ in xrange(n):
-            next_offset = offset + list_entry_len
-            oid = unpack(list_entry_format, body[offset:next_offset])[0]
-            offset = next_offset
-            oid_list.append(oid)
-        return (oid_list,)
+    _fmt = PStruct('ask_new_oids',
+        PNumber('num_oids'),
+    )
 
-class AskStoreObject(Packet):
+    _answer = PStruct('answer_new_oids',
+        PFOidList,
+    )
+
+class StoreObject(Packet):
     """
     Ask to store an object. Send an OID, an original serial, a current
     transaction ID, and data. C -> S.
-    """
-    _header_format = '!8s8s8sBL8sB'
-
-    @profiler_decorator
-    def _encode(self, oid, serial, compression, checksum, data, data_serial,
-            tid, unlock):
-        if serial is None:
-            serial = INVALID_TID
-        if data_serial is None:
-            data_serial = INVALID_TID
-        unlock = unlock and 1 or 0
-        return pack(self._header_format, oid, serial, tid, compression,
-                          checksum, data_serial, unlock) + _encodeString(data)
-
-    def _decode(self, body):
-        header_len = self._header_len
-        r = unpack(self._header_format, body[:header_len])
-        oid, serial, tid, compression, checksum, data_serial, unlock = r
-        serial = _decodeTID(serial)
-        data_serial = _decodeTID(data_serial)
-        (data, _) = _decodeString(body, 'data', offset=header_len)
-        return (oid, serial, compression, checksum, data, data_serial, tid,
-            bool(unlock))
-
-class AnswerStoreObject(Packet):
-    """
     Answer if an object has been stored. If an object is in conflict,
     a serial of the conflicting transaction is returned. In this case,
     if this serial is newer than the current transaction ID, a client
     node must not try to resolve the conflict. S -> C.
     """
-    _header_format = '!B8s8s'
+    _fmt = PStruct('ask_store_object',
+        POID('oid'),
+        PTID('serial'),
+        PBoolean('compression'),
+        PNumber('checksum'),
+        PString('data'),
+        PTID('data_serial'),
+        PTID('tid'),
+        PBoolean('unlock'),
+    )
 
-    def _encode(self, conflicting, oid, serial):
-        if serial is None:
-            serial = INVALID_TID
-        return pack(self._header_format, conflicting, oid, serial)
-
-    def _decode(self, body):
-        (conflicting, oid, serial) = unpack(self._header_format, body)
-        return (conflicting, oid, serial)
+    _answer = PStruct('answer_store_object',
+        PBoolean('conflicting'),
+        POID('oid'),
+        PTID('serial'),
+    )
 
 class AbortTransaction(Packet):
     """
     Abort a transaction. C -> S, PM.
     """
-    def _decode(self, body):
-        (tid, ) = unpack('8s', body)
-        return (tid, )
+    _fmt = PStruct('abort_transaction',
+        PTID('tid'),
+    )
 
-class AskStoreTransaction(Packet):
+class StoreTransaction(Packet):
     """
     Ask to store a transaction. C -> S.
-    """
-    _header_format = '!8sLHHH'
-
-    def _encode(self, tid, user, desc, ext, oid_list):
-        lengths = (len(oid_list), len(user), len(desc), len(ext))
-        body = [pack(self._header_format, tid, *lengths)]
-        body.append(user)
-        body.append(desc)
-        body.append(ext)
-        body.extend(oid_list)
-        return ''.join(body)
-
-    def _decode(self, body):
-        r = unpack(self._header_format, body[:self._header_len])
-        tid, oid_len, user_len, desc_len, ext_len = r
-        body = body[self._header_len:]
-        user = body[:user_len]
-        body = body[user_len:]
-        desc = body[:desc_len]
-        body = body[desc_len:]
-        ext = body[:ext_len]
-        body = body[ext_len:]
-        oid_list = []
-        for _ in xrange(oid_len):
-            (oid, ) = unpack('8s', body[:8])
-            body = body[8:]
-            oid_list.append(oid)
-        return (tid, user, desc, ext, oid_list)
-
-class AnswerStoreTransaction(Packet):
-    """
     Answer if transaction has been stored. S -> C.
     """
-    def _encode(self, tid):
-        return _encodeTID(tid)
+    _fmt = PStruct('ask_store_transaction',
+        PTID('tid'),
+        PString('user'),
+        PString('description'),
+        PString('extension'),
+        PFOidList,
+    )
 
-    def _decode(self, body):
-        (tid, ) = unpack('8s', body)
-        return (tid, )
+    _answer = PStruct('answer_store_transaction',
+        PTID('tid'),
+    )
 
-class AskObject(Packet):
+class GetObject(Packet):
     """
     Ask a stored object by its OID and a serial or a TID if given. If a serial
     is specified, the specified revision of an object will be returned. If
     a TID is specified, an object right before the TID will be returned. S,C -> S.
-    """
-    _header_format = '!8s8s8s'
-
-    def _encode(self, oid, serial, tid):
-        tid = _encodeTID(tid)
-        serial = _encodeTID(serial) # serial is the previous TID
-        return pack(self._header_format, oid, serial, tid)
-
-    def _decode(self, body):
-        (oid, serial, tid) = unpack(self._header_format, body)
-        if serial == INVALID_TID:
-            serial = None
-        tid = _decodeTID(tid)
-        return (oid, serial, tid)
-
-class AnswerObject(Packet):
-    """
     Answer the requested object. S -> C.
     """
-    _header_format = '!8s8s8s8sBL'
+    _fmt = PStruct('ask_object',
+        POID('oid'),
+        PTID('serial'),
+        PTID('tid'),
+    )
 
-    def _encode(self, oid, serial_start, serial_end, compression,
-            checksum, data, data_serial):
-        if serial_start is None:
-            serial_start = INVALID_TID
-        if serial_end is None:
-            serial_end = INVALID_TID
-        if data_serial is None:
-            data_serial = INVALID_TID
-        return pack(self._header_format, oid, serial_start, serial_end,
-            data_serial, compression, checksum) + _encodeString(data)
-
-    def _decode(self, body):
-        header_len = self._header_len
-        r = unpack(self._header_format, body[:header_len])
-        oid, serial_start, serial_end, data_serial, compression, checksum = r
-        if serial_end == INVALID_TID:
-            serial_end = None
-        if data_serial == INVALID_TID:
-            data_serial = None
-        (data, _) = _decodeString(body, 'data', offset=header_len)
-        return (oid, serial_start, serial_end, compression, checksum, data,
-            data_serial)
+    _answer = PStruct('answer_object',
+        POID('oid'),
+        PTID('serial_start'),
+        PTID('serial_end'),
+        PBoolean('compression'),
+        PNumber('checksum'),
+        PString('data'),
+        PTID('data_serial'),
+    )
 
-class AskTIDs(Packet):
+class TIDList(Packet):
     """
     Ask for TIDs between a range of offsets. The order of TIDs is descending,
     and the range is [first, last). C -> S.
-    """
-    _header_format = '!QQL'
-
-    def _encode(self, first, last, partition):
-        return pack(self._header_format, first, last, partition)
-
-    def _decode(self, body):
-        return unpack(self._header_format, body) # first, last, partition
-
-class AnswerTIDs(Packet):
-    """
     Answer the requested TIDs. S -> C.
     """
-    _header_format = '!L'
-    _list_entry_format = '8s'
-    _list_entry_len = calcsize(_list_entry_format)
-
-    def _encode(self, tid_list):
-        body = [pack(self._header_format, len(tid_list))]
-        body.extend(tid_list)
-        return ''.join(body)
-
-    def _decode(self, body):
-        offset = self._header_len
-        (n, ) = unpack(self._header_format, body[:offset])
-        tid_list = []
-        list_entry_format = self._list_entry_format
-        list_entry_len = self._list_entry_len
-        for _ in xrange(n):
-            next_offset = offset + list_entry_len
-            tid = unpack(list_entry_format, body[offset:next_offset])[0]
-            offset = next_offset
-            tid_list.append(tid)
-        return (tid_list,)
+    _fmt = PStruct('ask_tids',
+        PIndex('first'),
+        PIndex('last'),
+        PNumber('partition'),
+    )
+
+    _answer = PStruct('answer_tids',
+        PFTidList,
+    )
 
-class AskTIDsFrom(Packet):
+class TIDListFrom(Packet):
     """
     Ask for length TIDs starting at min_tid. The order of TIDs is ascending.
     S -> S.
-    """
-    _header_format = '!8s8sLL'
-    _list_entry_format = 'L'
-    _list_entry_len = calcsize(_list_entry_format)
-
-    def _encode(self, min_tid, max_tid, length, partition_list):
-        body = [pack(self._header_format, min_tid, max_tid, length,
-            len(partition_list))]
-        list_entry_format = self._list_entry_format
-        for partition in partition_list:
-            body.append(pack(list_entry_format, partition))
-        return ''.join(body)
-
-    def _decode(self, body):
-        body = StringIO(body)
-        read = body.read
-        header = unpack(self._header_format, read(self._header_len))
-        min_tid, max_tid, length, list_length = header
-        list_entry_format = self._list_entry_format
-        list_entry_len = self._list_entry_len
-        partition_list = []
-        for _ in xrange(list_length):
-            partition = unpack(list_entry_format, read(list_entry_len))[0]
-            partition_list.append(partition)
-        return (min_tid, max_tid, length, partition_list)
-
-class AnswerTIDsFrom(AnswerTIDs):
-    """
     Answer the requested TIDs. S -> S
     """
-    pass
+    _fmt = PStruct('tid_list_from',
+        PTID('min_tid'),
+        PTID('max_tid'),
+        PNumber('length'),
+        PList('partition_list',
+            PNumber('partition'),
+        ),
+    )
 
-class AskTransactionInformation(Packet):
-    """
-    Ask information about a transaction. Any -> S.
-    """
-    def _decode(self, body):
-        (tid, ) = unpack('8s', body)
-        return (tid, )
+    _answer = PStruct('answer_tids',
+        PFTidList,
+    )
 
-class AnswerTransactionInformation(Packet):
+class TransactionInformation(Packet):
     """
+    Ask information about a transaction. Any -> S.
     Answer information (user, description) about a transaction. S -> Any.
     """
-    _header_format = '!8sHHHBL'
+    _fmt = PStruct('ask_transaction_information',
+        PTID('tid'),
+    )
 
-    def _encode(self, tid, user, desc, ext, packed, oid_list):
-        packed = packed and 1 or 0
-        body = [pack(self._header_format, tid, len(user), len(desc), len(ext),
-            packed, len(oid_list))]
-        body.append(user)
-        body.append(desc)
-        body.append(ext)
-        body.extend(oid_list)
-        return ''.join(body)
-
-    def _decode(self, body):
-        r = unpack(self._header_format, body[:self._header_len])
-        tid, user_len, desc_len, ext_len, packed, oid_len = r
-        packed = bool(packed)
-        body = body[self._header_len:]
-        user = body[:user_len]
-        body = body[user_len:]
-        desc = body[:desc_len]
-        body = body[desc_len:]
-        ext = body[:ext_len]
-        body = body[ext_len:]
-        oid_list = []
-        for _ in xrange(oid_len):
-            (oid, ) = unpack('8s', body[:8])
-            body = body[8:]
-            oid_list.append(oid)
-        return (tid, user, desc, ext, packed, oid_list)
+    _answer = PStruct('answer_transaction_information',
+        PTID('tid'),
+        PString('user'),
+        PString('description'),
+        PString('extension'),
+        PBoolean('packed'),
+        PFOidList,
+    )
 
-class AskObjectHistory(Packet):
+class ObjectHistory(Packet):
     """
     Ask history information for a given object. The order of serials is
     descending, and the range is [first, last]. C -> S.
-    """
-    _header_format = '!8sQQ'
-
-    def _encode(self, oid, first, last):
-        return pack(self._header_format, oid, first, last)
-
-    def _decode(self, body):
-        (oid, first, last) = unpack(self._header_format, body)
-        return (oid, first, last)
-
-class AnswerObjectHistory(Packet):
-    """
     Answer history information (serial, size) for an object. S -> C.
     """
-    _header_format = '!8sL'
-    _list_entry_format = '!8sL'
-    _list_entry_len = calcsize(_list_entry_format)
-
-    def _encode(self, oid, history_list):
-        body = [pack(self._header_format, oid, len(history_list))]
-        list_entry_format = self._list_entry_format
-        for serial, size in history_list:
-            body.append(pack(list_entry_format, serial, size))
-        return ''.join(body)
-
-    def _decode(self, body):
-        offset = self._header_len
-        (oid, length) = unpack(self._header_format, body[:offset])
-        history_list = []
-        list_entry_format = self._list_entry_format
-        list_entry_len = self._list_entry_len
-        for _ in xrange(length):
-            next_offset = offset + list_entry_len
-            serial, size = unpack(list_entry_format, body[offset:next_offset])
-            offset = next_offset
-            history_list.append((serial, size))
-        return (oid, history_list)
+    _fmt = PStruct('ask_object_history',
+        POID('oid'),
+        PIndex('first'),
+        PIndex('last'),
+    )
+
+    _answer = PStruct('answer_object_history',
+        POID('oid'),
+        PFHistoryList,
+    )
 
-class AskObjectHistoryFrom(Packet):
+class ObjectHistoryFrom(Packet):
     """
     Ask history information for a given object. The order of serials is
     ascending, and starts at (or above) min_serial for min_oid. S -> S.
-    """
-    _header_format = '!8s8s8sLL'
-
-    def _encode(self, min_oid, min_serial, max_serial, length, partition):
-        return pack(self._header_format, min_oid, min_serial, max_serial,
-            length, partition)
-
-    def _decode(self, body):
-        # min_oid, min_serial, length, partition
-        return unpack(self._header_format, body)
-
-class AnswerObjectHistoryFrom(Packet):
-    """
     Answer the requested serials. S -> S.
     """
-    _header_format = '!L'
-    _list_entry_format = '!8sL'
-    _list_entry_len = calcsize(_list_entry_format)
-
-    def _encode(self, object_dict):
-        body = [pack(self._header_format, len(object_dict))]
-        append = body.append
-        extend = body.extend
-        list_entry_format = self._list_entry_format
-        for oid, serial_list in object_dict.iteritems():
-            append(pack(list_entry_format, oid, len(serial_list)))
-            extend(serial_list)
-        return ''.join(body)
-
-    def _decode(self, body):
-        body = StringIO(body)
-        read = body.read
-        list_entry_format = self._list_entry_format
-        list_entry_len = self._list_entry_len
-        object_dict = {}
-        dict_len = unpack(self._header_format, read(self._header_len))[0]
-        for _ in xrange(dict_len):
-            oid, serial_len = unpack(list_entry_format, read(list_entry_len))
-            object_dict[oid] = [read(TID_LEN) for _ in xrange(serial_len)]
-        return (object_dict, )
+    _fmt = PStruct('ask_object_history',
+        POID('min_oid'),
+        PTID('min_serial'),
+        PTID('max_serial'),
+        PNumber('length'),
+        PNumber('partition'),
+    )
+
+    _answer = PStruct('ask_finish_transaction',
+        PDict('object_dict',
+            POID('oid'),
+            PFTidList,
+        ),
+    )
 
-class AskPartitionList(Packet):
+class PartitionList(Packet):
     """
     All the following messages are for neoctl to admin node
     Ask information about partition
-    """
-    _header_format = '!LL16s'
-
-    def _encode(self, min_offset, max_offset, uuid):
-        uuid = _encodeUUID(uuid)
-        body = [pack(self._header_format, min_offset, max_offset, uuid)]
-        return ''.join(body)
-
-    def _decode(self, body):
-        (min_offset, max_offset, uuid) =  unpack(self._header_format, body)
-        uuid = _decodeUUID(uuid)
-        return (min_offset, max_offset, uuid)
-
-class AnswerPartitionList(Packet):
-    """
     Answer information about partition
     """
-    _header_format = '!8sL'
-    _row_entry_format = '!LL'
-    _row_entry_len = calcsize(_row_entry_format)
-    _cell_entry_format = '!16sH'
-    _cell_entry_len = calcsize(_cell_entry_format)
-
-    def _encode(self, ptid, row_list):
-        ptid = _encodePTID(ptid)
-        body = [pack(self._header_format, ptid, len(row_list))]
-        row_entry_format = self._row_entry_format
-        cell_entry_format = self._cell_entry_format
-        for offset, cell_list in row_list:
-            body.append(pack(row_entry_format, offset, len(cell_list)))
-            for uuid, state in cell_list:
-                uuid = _encodeUUID(uuid)
-                body.append(pack(cell_entry_format, uuid, state))
-        return ''.join(body)
-
-    def _decode(self, body):
-        index = self._header_len
-        (ptid, n) = unpack(self._header_format, body[:index])
-        ptid = _decodePTID(ptid)
-        row_list = []
-        cell_list = []
-        row_entry_format = self._row_entry_format
-        row_entry_len = self._row_entry_len
-        cell_entry_format = self._cell_entry_format
-        cell_entry_len = self._cell_entry_len
-        for _ in xrange(n):
-            next_index = index + row_entry_len
-            offset, m = unpack(row_entry_format, body[index:next_index])
-            index = next_index
-            for _ in xrange(m):
-                next_index = index + cell_entry_len
-                uuid, state = unpack(cell_entry_format, body[index:next_index])
-                index = next_index
-                state = CellStates.get(state)
-                uuid = _decodeUUID(uuid)
-                cell_list.append((uuid, state))
-            row_list.append((offset, tuple(cell_list)))
-            del cell_list[:]
-        return (ptid, row_list)
-
-class AskNodeList(Packet):
-    """
-    Ask information about nodes
-    """
-    _header_format = '!H'
-
-    def _encode(self, node_type):
-        return ''.join([pack(self._header_format, node_type)])
+    _fmt = PStruct('ask_partition_list',
+        PNumber('min_offset'),
+        PNumber('max_offset'),
+        PUUID('uuid'),
+    )
 
-    def _decode(self, body):
-        (node_type, ) = unpack(self._header_format, body)
-        node_type = _decodeNodeType(node_type)
-        return (node_type,)
+    _answer = PStruct('answer_partition_list',
+        PPTID('ptid'),
+        PFRowList,
+    )
 
-class AnswerNodeList(Packet):
+class NodeList(Packet):
     """
+    Ask information about nodes
     Answer information about nodes
     """
-    _header_format = '!L'
-    _list_entry_format = '!H6s16sH'
-    _list_entry_len = calcsize(_list_entry_format)
-
-    def _encode(self, node_list):
-        body = [pack(self._header_format, len(node_list))]
-        list_entry_format = self._list_entry_format
-        for node_type, address, uuid, state in node_list:
-            uuid = _encodeUUID(uuid)
-            address = _encodeAddress(address)
-            body.append(pack(list_entry_format, node_type, address, uuid,
-                state))
-        return ''.join(body)
-
-    def _decode(self, body):
-        offset = self._header_len
-        (n,) = unpack(self._header_format, body[:offset])
-        node_list = []
-        list_entry_format = self._list_entry_format
-        list_entry_len = self._list_entry_len
-        for _ in xrange(n):
-            next_offset = offset + list_entry_len
-            r = unpack(list_entry_format, body[offset:next_offset])
-            offset = next_offset
-            node_type, address, uuid, state = r
-            address = _decodeAddress(address)
-            node_type = _decodeNodeType(node_type)
-            state = _decodeNodeState(state)
-            uuid = _decodeUUID(uuid)
-            node_list.append((node_type, address, uuid, state))
-        return (node_list,)
+    _fmt = PStruct('ask_node_list',
+        PFNodeType,
+    )
+
+    _answer = PStruct('answer_node_list',
+        PFNodeList,
+    )
 
 class SetNodeState(Packet):
     """
     Set the node state
     """
-    _header_format = '!16sHB'
+    _fmt = PStruct('set_node_state',
+        PUUID('uuid'),
+        PFNodeState,
+        PBoolean('modify_partition_table'),
+    )
 
-    def _encode(self, uuid, state, modify_partition_table):
-        uuid = _encodeUUID(uuid)
-        return ''.join([pack(self._header_format, uuid, state,
-            modify_partition_table)])
-
-    def _decode(self, body):
-        (uuid, state, modify) = unpack(self._header_format, body)
-        state = _decodeNodeState(state)
-        uuid = _decodeUUID(uuid)
-        return (uuid, state, modify)
+    _answer = Error
 
 class AddPendingNodes(Packet):
     """
     Ask the primary to include some pending node in the partition table
     """
-    _header_format = '!H'
-    _list_header_format = '!16s'
-    _list_header_len = calcsize(_list_header_format)
-
-    def _encode(self, uuid_list=()):
-        list_header_format = self._list_header_format
-        # an empty list means all current pending nodes
-        uuid_list = [pack(list_header_format, _encodeUUID(uuid)) \
-            for uuid in uuid_list]
-        return pack(self._header_format, len(uuid_list)) + ''.join(uuid_list)
-
-    def _decode(self, body):
-        header_len = self._header_len
-        (n, ) = unpack(self._header_format, body[:header_len])
-        list_header_format = self._list_header_format
-        list_header_len = self._list_header_len
-        uuid_list = [unpack(list_header_format,
-            body[header_len+i*list_header_len:\
-                 header_len+(i+1)*list_header_len])[0] for i in xrange(n)]
-        uuid_list = [_decodeUUID(x) for x in uuid_list]
-        return (uuid_list, )
+    _fmt = PStruct('add_pending_nodes',
+        PFUUIDList,
+    )
+
+    _answer = Error
 
 class NotifyNodeInformation(Packet):
     """
     Notify information about one or more nodes. PM -> Any.
     """
-    _header_format = '!L'
-    _list_entry_format = '!H6s16sH'
-    _list_entry_len = calcsize(_list_entry_format)
-
-    def _encode(self, node_list):
-        body = [pack(self._header_format, len(node_list))]
-        list_entry_format = self._list_entry_format
-        for node_type, address, uuid, state in node_list:
-            uuid = _encodeUUID(uuid)
-            address = _encodeAddress(address)
-            body.append(pack(list_entry_format, node_type, address, uuid,
-                state))
-        return ''.join(body)
-
-    def _decode(self, body):
-        offset = self._header_len
-        (n,) = unpack(self._header_format, body[:offset])
-        node_list = []
-        list_entry_format = self._list_entry_format
-        list_entry_len = self._list_entry_len
-        for _ in xrange(n):
-            next_offset = offset + list_entry_len
-            r = unpack(list_entry_format, body[offset:next_offset])
-            offset = next_offset
-            node_type, address, uuid, state = r
-            address = _decodeAddress(address)
-            node_type = _decodeNodeType(node_type)
-            state = _decodeNodeState(state)
-            uuid = _decodeUUID(uuid)
-            node_list.append((node_type, address, uuid, state))
-        return (node_list,)
+    _fmt = PStruct('notify_node_informations',
+        PFNodeList,
+    )
 
-class AskNodeInformation(Packet):
+class NodeInformation(Packet):
     """
     Ask node information
     """
-    pass
-
-class AnswerNodeInformation(Packet):
-    """
-    Answer node information
-    """
-    pass
+    _answer = PFEmpty
 
 class SetClusterState(Packet):
     """
     Set the cluster state
     """
-    _header_format = '!H'
-
-    def _encode(self, state):
-        return pack(self._header_format, state)
+    _fmt = PStruct('set_cluster_state',
+        PEnum('state', ClusterStates),
+    )
 
-    def _decode(self, body):
-        (state, ) = unpack(self._header_format, body[:self._header_len])
-        state = _decodeClusterState(state)
-        return (state, )
+    _answer = Error
 
-class NotifyClusterInformation(Packet):
+class ClusterInformation(Packet):
     """
     Notify information about the cluster
     """
-    _header_format = '!H'
-
-    def _encode(self, state):
-        return pack(self._header_format, state)
-
-    def _decode(self, body):
-        (state, ) = unpack(self._header_format, body)
-        state = _decodeClusterState(state)
-        return (state, )
+    _fmt = PStruct('notify_cluster_information',
+        PEnum('state', ClusterStates),
+    )
 
-class AskClusterState(Packet):
+class ClusterState(Packet):
     """
     Ask state of the cluster
-    """
-    pass
-
-class AnswerClusterState(Packet):
-    """
     Answer state of the cluster
     """
-    _header_format = '!H'
 
-    def _encode(self, state):
-        return pack(self._header_format, state)
-
-    def _decode(self, body):
-        (state, ) = unpack(self._header_format, body)
-        state = _decodeClusterState(state)
-        return (state, )
+    _answer = PStruct('answer_cluster_state',
+        PEnum('state', ClusterStates),
+    )
 
 class NotifyLastOID(Packet):
     """
     Notify last OID generated
     """
-    def _decode(self, body):
-        (loid, ) = unpack('8s', body)
-        return (loid, )
+    _fmt = PStruct('notify_last_oid',
+        POID('last_oid'),
+    )
 
-class AskObjectUndoSerial(Packet):
+class ObjectUndoSerial(Packet):
     """
     Ask storage the serial where object data is when undoing given transaction,
     for a list of OIDs.
     C -> S
-    """
-    _header_format = '!8s8s8sL'
-
-    def _encode(self, tid, ltid, undone_tid, oid_list):
-        body = StringIO()
-        write = body.write
-        write(pack(self._header_format, tid, ltid, undone_tid, len(oid_list)))
-        for oid in oid_list:
-            write(oid)
-        return body.getvalue()
-
-    def _decode(self, body):
-        body = StringIO(body)
-        read = body.read
-        tid, ltid, undone_tid, oid_list_len = unpack(self._header_format,
-            read(self._header_len))
-        oid_list = [read(8) for _ in xrange(oid_list_len)]
-        return tid, ltid, undone_tid, oid_list
-
-class AnswerObjectUndoSerial(Packet):
-    """
     Answer serials at which object data is when undoing a given transaction.
     object_tid_dict has the following format:
         key: oid
@@ -1569,207 +1145,154 @@ class AnswerObjectUndoSerial(Packet):
                 If current_serial's data is current on storage.
     S -> C
     """
-    _header_format = '!L'
-    _list_entry_format = '!8s8s8sB'
-    _list_entry_len = calcsize(_list_entry_format)
-
-    def _encode(self, object_tid_dict):
-        body = StringIO()
-        write = body.write
-        write(pack(self._header_format, len(object_tid_dict)))
-        list_entry_format = self._list_entry_format
-        for oid, (current_serial, undo_serial, is_current) in \
-                object_tid_dict.iteritems():
-            if undo_serial is None:
-                undo_serial = ZERO_TID
-            write(pack(list_entry_format, oid, current_serial, undo_serial,
-                is_current))
-        return body.getvalue()
-
-    def _decode(self, body):
-        body = StringIO(body)
-        read = body.read
-        object_tid_dict = {}
-        list_entry_format = self._list_entry_format
-        list_entry_len = self._list_entry_len
-        object_tid_len = unpack(self._header_format, read(self._header_len))[0]
-        for _ in xrange(object_tid_len):
-            oid, current_serial, undo_serial, is_current = unpack(
-                list_entry_format, read(list_entry_len))
-            if undo_serial == ZERO_TID:
-                undo_serial = None
-            object_tid_dict[oid] = (current_serial, undo_serial,
-                bool(is_current))
-        return (object_tid_dict, )
+    _fmt = PStruct('ask_undo_transaction',
+        PTID('tid'),
+        PTID('ltid'),
+        PTID('undone_tid'),
+        PFOidList,
+    )
+
+    _answer = PStruct('answer_undo_transaction',
+        PDict('object_tid_dict',
+            POID('oid'),
+            PStruct('object_tid_value',
+                PTID('current_serial'),
+                PTID('undo_serial'),
+                PBoolean('is_current'),
+            ),
+        ),
+    )
 
-class AskHasLock(Packet):
+class HasLock(Packet):
     """
     Ask a storage is oid is locked by another transaction.
     C -> S
-    """
-    def _encode(self, tid, oid):
-        return _encodeTID(tid) + _encodeTID(oid)
-
-    def _decode(self, body):
-        return (_decodeTID(body[:8]), _decodeTID(body[8:]))
-
-class AnswerHasLock(Packet):
-    """
     Answer whether a transaction holds the write lock for requested object.
     """
-    _header_format = '!8sH'
-
-    def _encode(self, oid, state):
-        return pack(self._header_format, oid, state)
+    _fmt = PStruct('has_load_lock',
+        PTID('tid'),
+        POID('oid'),
+    )
 
-    def _decode(self, body):
-        oid, state = unpack(self._header_format, body)
-        return (oid, _decodeLockState(state))
+    _answer = PStruct('answer_has_lock',
+        POID('oid'),
+        PEnum('lock_state', LockState),
+    )
 
-class AskCheckCurrentSerial(Packet):
+class CheckCurrentSerial(Packet):
     """
     Verifies if given serial is current for object oid in the database, and
     take a write lock on it (so that this state is not altered until
     transaction ends).
-    """
-    _header_format = '!8s8s8s'
-
-    def _encode(self, tid, serial, oid):
-        return tid + serial + oid
-
-    def _decode(self, body):
-        return unpack(self._header_format, body)
-
-class AnswerCheckCurrentSerial(AnswerStoreObject):
-    """
     Answer to AskCheckCurrentSerial.
     Same structure as AnswerStoreObject, to handle the same way, except there
     is nothing to invalidate in any client's cache.
     """
-    pass
+    _fmt = PStruct('ask_check_current_serial',
+        PTID('tid'),
+        PTID('serial'),
+        POID('oid'),
+    )
+
+    _answer = PStruct('answer_store_object',
+        PBoolean('conflicting'),
+        POID('oid'),
+        PTID('serial'),
+    )
 
-class AskBarrier(Packet):
+class Barrier(Packet):
     """
     Initates a "network barrier", allowing the node sending this packet to know
     when all packets sent previously on the same connection have been handled
     by its peer.
     """
-    pass
 
-class AnswerBarrier(Packet):
-    pass
+    _answer = PFEmpty
 
-class AskPack(Packet):
+class Pack(Packet):
     """
     Request a pack at given TID.
     C -> M
     M -> S
-    """
-    def _encode(self, tid):
-        return _encodeTID(tid)
-
-    def _decode(self, body):
-        return (_decodeTID(body), )
-
-class AnswerPack(Packet):
-    """
     Inform that packing it over.
     S -> M
     M -> C
     """
-    _header_format = '!H'
-
-    def _encode(self, status):
-        return pack(self._header_format, int(status))
+    _fmt = PStruct('ask_pack',
+        PTID('tid'),
+    )
 
-    def _decode(self, body):
-        return (bool(unpack(self._header_format, body)[0]), )
+    _answer = PStruct('answer_pack',
+        PBoolean('status'),
+    )
 
-class AskCheckTIDRange(Packet):
+class CheckTIDRange(Packet):
     """
     Ask some stats about a range of transactions.
     Used to know if there are differences between a replicating node and
     reference node.
     S -> S
-    """
-    _header_format = '!8s8sLL'
-
-    def _encode(self, min_tid, max_tid, length, partition):
-        return pack(self._header_format, min_tid, max_tid, length, partition)
-
-    def _decode(self, body):
-        # min_tid, max_tid, length, partition
-        return unpack(self._header_format, body)
-
-class AnswerCheckTIDRange(Packet):
-    """
     Stats about a range of transactions.
     Used to know if there are differences between a replicating node and
     reference node.
     S -> S
     """
-    _header_format = '!8sLLQ8s'
-    def _encode(self, min_tid, length, count, tid_checksum, max_tid):
-        return pack(self._header_format, min_tid, length, count, tid_checksum,
-            max_tid)
-
-    def _decode(self, body):
-        # min_tid, length, partition, count, tid_checksum, max_tid
-        return unpack(self._header_format, body)
+    _fmt = PStruct('ask_check_tid_range',
+        PTID('min_tid'),
+        PTID('max_tid'),
+        PNumber('length'),
+        PNumber('partition'),
+    )
+
+    _answer = PStruct('answer_check_tid_range',
+        PTID('min_tid'),
+        PNumber('length'),
+        PNumber('count'),
+        PChecksum('checksum'),
+        PTID('max_tid'),
+    )
 
-class AskCheckSerialRange(Packet):
+class CheckSerialRange(Packet):
     """
     Ask some stats about a range of object history.
     Used to know if there are differences between a replicating node and
     reference node.
     S -> S
-    """
-    _header_format = '!8s8s8sLL'
-
-    def _encode(self, min_oid, min_serial, max_tid, length, partition):
-        return pack(self._header_format, min_oid, min_serial, max_tid, length,
-            partition)
-
-    def _decode(self, body):
-        # min_oid, min_serial, max_tid, length, partition
-        return unpack(self._header_format, body)
-
-class AnswerCheckSerialRange(Packet):
-    """
     Stats about a range of object history.
     Used to know if there are differences between a replicating node and
     reference node.
     S -> S
     """
-    _header_format = '!8s8sLLQ8sQ8s'
+    _fmt = PStruct('ask_check_serial_range',
+        POID('min_oid'),
+        PTID('min_serial'),
+        PTID('max_tid'),
+        PNumber('length'),
+        PNumber('partition'),
+    )
 
-    def _encode(self, min_oid, min_serial, length, count, oid_checksum,
-            max_oid, serial_checksum, max_serial):
-        return pack(self._header_format, min_oid, min_serial, length, count,
-            oid_checksum, max_oid, serial_checksum, max_serial)
-
-    def _decode(self, body):
-        # min_oid, min_serial, length, count, oid_checksum, max_oid,
-        # serial_checksum, max_serial
-        return unpack(self._header_format, body)
+    _answer = PStruct('answer_check_serial_range',
+        POID('min_oid'),
+        PTID('min_serial'),
+        PNumber('length'),
+        PNumber('count'),
+        PChecksum('oid_checksum'),
+        POID('max_oid'),
+        PChecksum('serial_checksum'),
+        PTID('max_serial'),
+    )
 
-class AskLastTransaction(Packet):
+class LastTransaction(Packet):
     """
     Ask last committed TID.
     C -> M
-    """
-    pass
-
-class AnswerLastTransaction(Packet):
-    """
     Answer last committed TID.
     M -> C
     """
-    def _encode(self, tid):
-        return tid
 
-    def _decode(self, body):
-        return (body, )
+    _answer = PStruct('answer_last_transaction',
+        PTID('tid'),
+    )
 
 class NotifyReady(Packet):
     """
@@ -1778,38 +1301,12 @@ class NotifyReady(Packet):
     """
     pass
 
-class Error(Packet):
-    """
-    Error is a special type of message, because this can be sent against
-    any other message, even if such a message does not expect a reply
-    usually. Any -> Any.
-    """
-    _header_format = '!H'
-
-    def _encode(self, code, message):
-        return pack(self._header_format, code) + _encodeString(message)
-
-    def _decode(self, body):
-        offset = self._header_len
-        (code, ) = unpack(self._header_format, body[:offset])
-        code = _decodeErrorCode(code)
-        (message, _) = _decodeString(body, 'message', offset=offset)
-        return (code, message)
-
-
-def initMessage(klass):
-    if klass._header_format is not None:
-        klass._header_len = calcsize(klass._header_format)
-
 StaticRegistry = {}
 def register(code, request, answer=None, ignore_when_closed=None):
     """ Register a packet in the packet registry """
     # register the request
-    # assert code & RESPONSE_MASK == 0
     assert code not in StaticRegistry, "Duplicate request packet code"
-    initMessage(request)
     request._code = code
-    request._answer = answer
     StaticRegistry[code] = request
     if ignore_when_closed is None:
         # By default, on a closed connection:
@@ -1818,17 +1315,21 @@ def register(code, request, answer=None,
         # - nofitication: keep
         ignore_when_closed = answer is not None
     request._ignore_when_closed = ignore_when_closed
-    if answer not in (None, Error):
-        initMessage(answer)
-        # compute the answer code
-        code = code | RESPONSE_MASK
-        answer._request = request
-        answer._code = code
-        # and register the answer packet
-        assert code not in StaticRegistry, "Duplicate response packet code"
-        StaticRegistry[code] = answer
-        return (request, answer)
-    return request
+    if request._answer in (Error, None):
+        return request
+    # build a class for the answer
+    answer = ClassType('Answer%s' % (request.__name__, ), (Packet, ), {})
+    answer._fmt = request._answer
+    # compute the answer code
+    code = code | RESPONSE_MASK
+    answer._request = request
+    assert answer._code is None, "Answer of %s is already used" % (request, )
+    answer._code = code
+    request._answer = answer
+    # and register the answer packet
+    assert code not in StaticRegistry, "Duplicate response packet code"
+    StaticRegistry[code] = answer
+    return (request, answer)
 
 class ParserState(object):
     """
@@ -1850,7 +1351,6 @@ class PacketRegistry(dict):
     """
     Packet registry that check packet code unicity and provide an index
     """
-
     def __init__(self):
         dict.__init__(self)
         # load packet classes
@@ -1859,10 +1359,10 @@ class PacketRegistry(dict):
     def parse(self, buf, state_container):
         state = state_container.get()
         if state is None:
-            header = buf.read(PACKET_HEADER_SIZE)
+            header = buf.read(PACKET_HEADER_FORMAT.size)
             if header is None:
                 return None
-            msg_id, msg_type, msg_len = unpack(PACKET_HEADER_FORMAT, header)
+            msg_id, msg_type, msg_len = PACKET_HEADER_FORMAT.unpack(header)
             try:
                 packet_klass = self[msg_type]
             except KeyError:
@@ -1871,7 +1371,7 @@ class PacketRegistry(dict):
                 raise PacketMalformedError('message too big (%d)' % msg_len)
             if msg_len < MIN_PACKET_SIZE:
                 raise PacketMalformedError('message too small (%d)' % msg_len)
-            msg_len -= PACKET_HEADER_SIZE
+            msg_len -= PACKET_HEADER_FORMAT.size
         else:
             msg_id, packet_klass, msg_len = state
         data = buf.read(msg_len)
@@ -1886,180 +1386,113 @@ class PacketRegistry(dict):
         packet.setContent(msg_id, data)
         return packet
 
-    # packets registration
-    Error = register(0x8000, Error)
-    Notify = register(0x0032, Notify)
+    # notifications
+    Error = register(
+            0x8000, Error)
     Ping, Pong = register(
-            0x0001,
-            Ping,
-            Pong)
+            0x0001, Ping)
+    Notify = register(
+            0x0002, Notify)
     RequestIdentification, AcceptIdentification = register(
-            0x0002,
-            RequestIdentification,
-            AcceptIdentification)
+            0x0003, RequestIdentification)
     AskPrimary, AnswerPrimary = register(
-            0x0003,
-            AskPrimary,
-            AnswerPrimary)
-    AnnouncePrimary = register(0x0004, AnnouncePrimary)
-    ReelectPrimary = register(0x0005, ReelectPrimary)
-    NotifyNodeInformation = register(0x0006, NotifyNodeInformation)
+            0x0004, PrimaryMaster)
+    AnnouncePrimary = register(
+            0x0005, AnnouncePrimary)
+    ReelectPrimary = register(
+            0x0006, ReelectPrimary)
+    NotifyNodeInformation = register(
+            0x0007, NotifyNodeInformation)
     AskLastIDs, AnswerLastIDs = register(
-            0x0007,
-            AskLastIDs,
-            AnswerLastIDs)
+            0x0008, LastIDs)
     AskPartitionTable, AnswerPartitionTable = register(
-            0x0008,
-            AskPartitionTable,
-            AnswerPartitionTable)
-    SendPartitionTable = register(0x0009, SendPartitionTable)
-    NotifyPartitionChanges = register(0x000A, NotifyPartitionChanges)
-    StartOperation = register(0x000B, StartOperation)
-    StopOperation = register(0x000C, StopOperation)
+            0x0009, PartitionTable)
+    SendPartitionTable = register(
+            0x000A, NotifyPartitionTable)
+    NotifyPartitionChanges = register(
+            0x000B, PartitionChanges)
+    StartOperation = register(
+            0x000C, StartOperation)
+    StopOperation = register(
+            0x000D, StopOperation)
     AskUnfinishedTransactions, AnswerUnfinishedTransactions = register(
-            0x000D,
-            AskUnfinishedTransactions,
-            AnswerUnfinishedTransactions)
+            0x000E, UnfinishedTransactions)
     AskObjectPresent, AnswerObjectPresent = register(
-            0x000f,
-            AskObjectPresent,
-            AnswerObjectPresent)
-    DeleteTransaction = register(0x0010, DeleteTransaction)
-    CommitTransaction = register(0x0011, CommitTransaction)
+            0x000F, ObjectPresent)
+    DeleteTransaction = register(
+            0x0010, DeleteTransaction)
+    CommitTransaction = register(
+            0x0011, CommitTransaction)
     AskBeginTransaction, AnswerBeginTransaction = register(
-            0x0012,
-            AskBeginTransaction,
-            AnswerBeginTransaction)
+            0x0012, BeginTransaction)
     AskFinishTransaction, AnswerTransactionFinished = register(
-            0x0013,
-            AskFinishTransaction,
-            AnswerTransactionFinished,
-            ignore_when_closed=False,
-    )
+            0x0013, FinishTransaction, ignore_when_closed=False)
     AskLockInformation, AnswerInformationLocked = register(
-            0x0014,
-            AskLockInformation,
-            AnswerInformationLocked,
-    )
-    InvalidateObjects = register(0x0015, InvalidateObjects)
-    NotifyUnlockInformation = register(0x0016, NotifyUnlockInformation)
+            0x0014, LockInformation, ignore_when_closed=False)
+    InvalidateObjects = register(
+            0x0015, InvalidateObjects)
+    NotifyUnlockInformation = register(
+            0x0016, UnlockInformation)
     AskNewOIDs, AnswerNewOIDs = register(
-            0x0017,
-            AskNewOIDs,
-            AnswerNewOIDs)
+            0x0017, GenerateOIDs)
     AskStoreObject, AnswerStoreObject = register(
-            0x0018,
-            AskStoreObject,
-            AnswerStoreObject)
-    AbortTransaction = register(0x0019, AbortTransaction)
+            0x0018, StoreObject)
+    AbortTransaction = register(
+            0x0019, AbortTransaction)
     AskStoreTransaction, AnswerStoreTransaction = register(
-            0x001A,
-            AskStoreTransaction,
-            AnswerStoreTransaction)
+            0x001A, StoreTransaction)
     AskObject, AnswerObject = register(
-            0x001B,
-            AskObject,
-            AnswerObject)
+            0x001B, GetObject)
     AskTIDs, AnswerTIDs = register(
-            0x001C,
-            AskTIDs,
-            AnswerTIDs)
+            0x001C, TIDList)
     AskTransactionInformation, AnswerTransactionInformation = register(
-            0x001E,
-            AskTransactionInformation,
-            AnswerTransactionInformation)
+            0x001D, TransactionInformation)
     AskObjectHistory, AnswerObjectHistory = register(
-            0x001F,
-            AskObjectHistory,
-            AnswerObjectHistory)
+            0x001E, ObjectHistory)
     AskPartitionList, AnswerPartitionList = register(
-            0x0021,
-            AskPartitionList,
-            AnswerPartitionList)
+            0x001F, PartitionList)
     AskNodeList, AnswerNodeList = register(
-            0x0022,
-            AskNodeList,
-            AnswerNodeList)
+            0x0020, NodeList)
     SetNodeState = register(
-            0x0023,
-            SetNodeState,
-            Error,
-            ignore_when_closed=False,
-    )
+            0x0021, SetNodeState, ignore_when_closed=False)
     AddPendingNodes = register(
-            0x0024,
-            AddPendingNodes,
-            Error,
-            ignore_when_closed=False,
-    )
+            0x0022, AddPendingNodes, ignore_when_closed=False)
     AskNodeInformation, AnswerNodeInformation = register(
-            0x0025,
-            AskNodeInformation,
-            AnswerNodeInformation)
+            0x0023, NodeInformation)
     SetClusterState = register(
-            0x0026,
-            SetClusterState,
-            Error,
-            ignore_when_closed=False,
-    )
-    NotifyClusterInformation = register(0x0027, NotifyClusterInformation)
+            0x0024, SetClusterState, ignore_when_closed=False)
+    NotifyClusterInformation = register(
+            0x0025, ClusterInformation)
     AskClusterState, AnswerClusterState = register(
-            0x0028,
-            AskClusterState,
-            AnswerClusterState)
-    NotifyLastOID = register(0x0030, NotifyLastOID)
-    NotifyReplicationDone = register(0x0031, NotifyReplicationDone)
+            0x0026, ClusterState)
+    NotifyLastOID = register(
+            0x0027, NotifyLastOID)
+    NotifyReplicationDone = register(
+            0x0028, ReplicationDone)
     AskObjectUndoSerial, AnswerObjectUndoSerial = register(
-            0x0033,
-            AskObjectUndoSerial,
-            AnswerObjectUndoSerial)
+            0x0029, ObjectUndoSerial)
     AskHasLock, AnswerHasLock = register(
-            0x0034,
-            AskHasLock,
-            AnswerHasLock)
+            0x002A, HasLock)
     AskTIDsFrom, AnswerTIDsFrom = register(
-            0x0035,
-            AskTIDsFrom,
-            AnswerTIDsFrom)
+            0x002B, TIDListFrom)
     AskObjectHistoryFrom, AnswerObjectHistoryFrom = register(
-            0x0036,
-            AskObjectHistoryFrom,
-            AnswerObjectHistoryFrom)
+            0x002C, ObjectHistoryFrom)
     AskBarrier, AnswerBarrier = register(
-            0x0037,
-            AskBarrier,
-            AnswerBarrier)
+            0x002D, Barrier)
     AskPack, AnswerPack = register(
-            0x0038,
-            AskPack,
-            AnswerPack,
-            ignore_when_closed=False,
-    )
+            0x002E, Pack, ignore_when_closed=False)
     AskCheckTIDRange, AnswerCheckTIDRange = register(
-            0x0039,
-            AskCheckTIDRange,
-            AnswerCheckTIDRange,
-            )
+            0x002F, CheckTIDRange)
     AskCheckSerialRange, AnswerCheckSerialRange = register(
-            0x003A,
-            AskCheckSerialRange,
-            AnswerCheckSerialRange,
-            )
-    NotifyReady = register(0x003B, NotifyReady)
+            0x0030, CheckSerialRange)
+    NotifyReady = register(
+            0x0031, NotifyReady)
     AskLastTransaction, AnswerLastTransaction = register(
-            0x003C,
-            AskLastTransaction,
-            AnswerLastTransaction,
-            )
+            0x0032, LastTransaction)
     AskCheckCurrentSerial, AnswerCheckCurrentSerial = register(
-            0x003D,
-            AskCheckCurrentSerial,
-            AnswerCheckCurrentSerial,
-            )
+            0x0033, CheckCurrentSerial)
     NotifyTransactionFinished = register(
-            0x003E,
-            NotifyTransactionFinished,
-            )
+            0x003E, NotifyTransactionFinished)
 
 # build a "singleton"
 Packets = PacketRegistry()
@@ -2073,7 +1506,6 @@ class ErrorRegistry(dict):
     """
         Error packet packet registry
     """
-
     def __init__(self):
         dict.__init__(self)
 

Modified: trunk/neo/lib/util.py
==============================================================================
--- trunk/neo/lib/util.py [iso-8859-1] (original)
+++ trunk/neo/lib/util.py [iso-8859-1] Tue Feb  8 16:47:10 2011
@@ -22,6 +22,23 @@ from zlib import adler32
 from Queue import deque
 from struct import pack, unpack
 
+try:
+    from struct import Struct
+except ImportError:
+    import struct
+    # support for python 2.4
+    class Struct(object):
+
+        def __init__(self, fmt):
+            self._fmt = fmt
+            self.size = struct.calcsize(fmt)
+
+        def pack(self, *args):
+            return struct.pack(self._fmt, *args)
+
+        def unpack(self, *args):
+            return struct.unpack(self._fmt, *args)
+
 def u64(s):
     return unpack('!Q', s)[0]
 

Modified: trunk/neo/tests/testProtocol.py
==============================================================================
--- trunk/neo/tests/testProtocol.py [iso-8859-1] (original)
+++ trunk/neo/tests/testProtocol.py [iso-8859-1] Tue Feb  8 16:47:10 2011
@@ -70,9 +70,9 @@ class ProtocolTests(NeoUnitTestBase):
 
     def test_11_RequestIdentification(self):
         uuid = self.getNewUUID()
-        p = Packets.RequestIdentification(NodeTypes.CLIENT, uuid,
-                                    ("127.0.0.1", 9080), "unittest")
-        node, p_uuid, (ip, port), name  = p.decode()
+        p = Packets.RequestIdentification(NodeTypes.CLIENT,
+                uuid, ("127.0.0.1", 9080), "unittest")
+        (plow, phigh), node, p_uuid, (ip, port), name  = p.decode()
         self.assertEqual(node, NodeTypes.CLIENT)
         self.assertEqual(p_uuid, uuid)
         self.assertEqual(ip, "127.0.0.1")
@@ -148,9 +148,11 @@ class ProtocolTests(NeoUnitTestBase):
         uuid1 = self.getNewUUID()
         uuid2 = self.getNewUUID()
         uuid3 = self.getNewUUID()
-        cell_list = [(0, ((uuid1, CellStates.UP_TO_DATE), (uuid2, CellStates.OUT_OF_DATE))),
-                     (43, ((uuid2, CellStates.OUT_OF_DATE),(uuid3, CellStates.DISCARDED))),
-                     (124, ((uuid1, CellStates.DISCARDED), (uuid3, CellStates.UP_TO_DATE)))]
+        cell_list = [
+            (0, [(uuid1, CellStates.UP_TO_DATE), (uuid2, CellStates.OUT_OF_DATE)]),
+            (43, [(uuid2, CellStates.OUT_OF_DATE), (uuid3, CellStates.DISCARDED)]),
+            (124, [(uuid1, CellStates.DISCARDED), (uuid3, CellStates.UP_TO_DATE)]),
+        ]
         p = Packets.AnswerPartitionTable(ptid, cell_list)
         pptid, p_cell_list  = p.decode()
         self.assertEqual(pptid, ptid)
@@ -161,9 +163,11 @@ class ProtocolTests(NeoUnitTestBase):
         uuid1 = self.getNewUUID()
         uuid2 = self.getNewUUID()
         uuid3 = self.getNewUUID()
-        cell_list = [(0, ((uuid1, CellStates.UP_TO_DATE), (uuid2, CellStates.OUT_OF_DATE))),
-                     (43, ((uuid2, CellStates.OUT_OF_DATE),(uuid3, CellStates.DISCARDED))),
-                     (124, ((uuid1, CellStates.DISCARDED), (uuid3, CellStates.UP_TO_DATE)))]
+        cell_list = [
+            (0, [(uuid1, CellStates.UP_TO_DATE), (uuid2, CellStates.OUT_OF_DATE)]),
+            (43, [(uuid2, CellStates.OUT_OF_DATE), (uuid3, CellStates.DISCARDED)]),
+            (124, [(uuid1, CellStates.DISCARDED), (uuid3, CellStates.UP_TO_DATE)]),
+        ]
         p = Packets.AnswerPartitionTable(ptid, cell_list)
         pptid, p_cell_list  = p.decode()
         self.assertEqual(pptid, ptid)
@@ -176,8 +180,7 @@ class ProtocolTests(NeoUnitTestBase):
         cell_list = [(0, uuid1, CellStates.UP_TO_DATE),
                      (43, uuid2, CellStates.OUT_OF_DATE),
                      (124, uuid1, CellStates.DISCARDED)]
-        p = Packets.NotifyPartitionChanges(ptid,
-                                 cell_list)
+        p = Packets.NotifyPartitionChanges(ptid, cell_list)
         pptid, p_cell_list  = p.decode()
         self.assertEqual(pptid, ptid)
         self.assertEqual(p_cell_list, cell_list)
@@ -235,7 +238,6 @@ class ProtocolTests(NeoUnitTestBase):
         ptid = p.decode()[0]
         self.assertEqual(ptid, tid)
 
-
     def test_32_askBeginTransaction(self):
         tid = self.getNextTID()
         p = Packets.AskBeginTransaction(tid)
@@ -370,11 +372,11 @@ class ProtocolTests(NeoUnitTestBase):
     def test_46_answerStoreObject(self):
         oid = self.getNextTID()
         serial = self.getNextTID()
-        p = Packets.AnswerStoreObject(1, oid, serial)
+        p = Packets.AnswerStoreObject(True, oid, serial)
         conflicting, poid, pserial = p.decode()
         self.assertEqual(oid, poid)
         self.assertEqual(serial, pserial)
-        self.assertEqual(conflicting, 1)
+        self.assertTrue(conflicting)
 
     def test_47_askObject(self):
         oid = self.getNextTID()
@@ -532,7 +534,7 @@ class ProtocolTests(NeoUnitTestBase):
 
     def test_AddPendingNodes(self):
         uuid1, uuid2 = self.getNewUUID(), self.getNewUUID()
-        p = Packets.AddPendingNodes([uuid1, uuid2])
+        p = Packets.AddPendingNodes((uuid1, uuid2))
         self.assertEqual(p.decode(), ([uuid1, uuid2], ))
 
     def test_SetNodeState(self):
@@ -551,7 +553,7 @@ class ProtocolTests(NeoUnitTestBase):
                 self.getNewUUID(), NodeStates.DOWN)
         node2 = (NodeTypes.MASTER, ('127.0.0.1', 2000),
                 self.getNewUUID(), NodeStates.RUNNING)
-        p = Packets.AnswerNodeList([node1, node2])
+        p = Packets.AnswerNodeList((node1, node2))
         self.assertEqual(p.decode(), ([node1, node2], ))
 
     def test_AskPartitionList(self):
@@ -564,14 +566,14 @@ class ProtocolTests(NeoUnitTestBase):
     def test_AnswerPartitionList(self):
         ptid = self.getPTID(1)
         row_list = [
-            (0, (
+            (0, [
                 (self.getNewUUID(), CellStates.UP_TO_DATE),
                 (self.getNewUUID(), CellStates.OUT_OF_DATE),
-                )),
-            (1, (
+                ]),
+            (1, [
                 (self.getNewUUID(), CellStates.FEEDING),
                 (self.getNewUUID(), CellStates.DISCARDED),
-                )),
+                ]),
         ]
         p = Packets.AnswerPartitionList(ptid, row_list)
         self.assertEqual(p.decode(), (ptid, row_list))




More information about the Neo-report mailing list