[Neo-report] r1911 vincent - in /trunk: neo/ neo/client/ tools/

nobody at svn.erp5.org nobody at svn.erp5.org
Mon Mar 8 11:31:22 CET 2010


Author: vincent
Date: Mon Mar  8 11:31:21 2010
New Revision: 1911

Log:
Add tiny_profiler measure points.

Given measure points were used to search for hot spots in client around
"store" method.

Modified:
    trunk/neo/client/app.py
    trunk/neo/client/pool.py
    trunk/neo/connection.py
    trunk/neo/dispatcher.py
    trunk/neo/logger.py
    trunk/neo/protocol.py
    trunk/tools/perfs

Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Mon Mar  8 11:31:21 2010
@@ -17,7 +17,7 @@
 
 from thread import get_ident
 from cPickle import dumps
-from zlib import compress, decompress
+from zlib import compress as real_compress, decompress
 from neo.locking import Queue, Empty
 from random import shuffle
 from time import sleep
@@ -32,7 +32,7 @@
 from neo import protocol
 from neo.protocol import NodeTypes, Packets
 from neo.event import EventManager
-from neo.util import makeChecksum, dump
+from neo.util import makeChecksum as real_makeChecksum, dump
 from neo.locking import Lock
 from neo.connection import MTClientConnection
 from neo.node import NodeManager
@@ -47,7 +47,22 @@
 from neo.client.mq import MQ
 from neo.client.pool import ConnectionPool
 from neo.util import u64, parseMasterList
-
+from neo.profiling import profiler_decorator, PROFILING_ENABLED
+
+if PROFILING_ENABLED:
+    # Those functions require a "real" python function wrapper before they can
+    # be decorated.
+    @profiler_decorator
+    def compress(data):
+        return real_compress(data)
+
+    @profiler_decorator
+    def makeChecksum(data):
+        return real_makeChecksum(data)
+else:
+    # If profiling is disabled, directly use original functions.
+    compress = real_compress
+    makeChecksum = real_makeChecksum
 
 class ThreadContext(object):
 
@@ -161,6 +176,7 @@
         self._nm_acquire = lock.acquire
         self._nm_release = lock.release
 
+    @profiler_decorator
     def _handlePacket(self, conn, packet, handler=None):
         """
           conn
@@ -186,6 +202,7 @@
                 raise ValueError, 'Unknown node type: %r' % (node.__class__, )
         handler.dispatch(conn, packet)
 
+    @profiler_decorator
     def _waitAnyMessage(self, block=True):
         """
           Handle all pending packets.
@@ -209,6 +226,7 @@
             except ConnectionClosed:
                 pass
 
+    @profiler_decorator
     def _waitMessage(self, target_conn, msg_id, handler=None):
         """Wait for a message returned by the dispatcher in queues."""
         get = self.local_var.queue.get
@@ -225,6 +243,7 @@
             elif packet is not None:
                 self._handlePacket(conn, packet)
 
+    @profiler_decorator
     def _askStorage(self, conn, packet):
         """ Send a request to a storage node and process it's answer """
         try:
@@ -234,6 +253,7 @@
             conn.unlock()
         self._waitMessage(conn, msg_id, self.storage_handler)
 
+    @profiler_decorator
     def _askPrimary(self, packet):
         """ Send a request to the primary master and process it's answer """
         conn = self._getMasterConnection()
@@ -244,6 +264,7 @@
             conn.unlock()
         self._waitMessage(conn, msg_id, self.primary_handler)
 
+    @profiler_decorator
     def _getMasterConnection(self):
         """ Connect to the primary master node on demand """
         # acquire the lock to allow only one thread to connect to the primary
@@ -265,6 +286,7 @@
         self._getMasterConnection()
         return self.pt
 
+    @profiler_decorator
     def _getCellListForOID(self, oid, readable=False, writable=False):
         """ Return the cells available for the specified OID """
         pt = self._getPartitionTable()
@@ -275,6 +297,7 @@
         pt = self._getPartitionTable()
         return pt.getCellListForTID(tid, readable, writable)
 
+    @profiler_decorator
     def _connectToPrimaryNode(self):
         logging.debug('connecting to primary master...')
         ready = False
@@ -377,6 +400,7 @@
     def getDB(self):
         return self._db
 
+    @profiler_decorator
     def new_oid(self):
         """Get a new OID."""
         self._oid_lock_acquire()
@@ -398,6 +422,7 @@
         # return the last OID used, this is innacurate
         return int(u64(self.last_oid))
 
+    @profiler_decorator
     def getSerial(self, oid):
         # Try in cache first
         self._cache_lock_acquire()
@@ -415,6 +440,7 @@
         return hist[1][0][0]
 
 
+    @profiler_decorator
     def _load(self, oid, serial=None, tid=None, cache=0):
         """Internal method which manage load ,loadSerial and loadBefore."""
         cell_list = self._getCellListForOID(oid, readable=True)
@@ -489,6 +515,7 @@
         return data, start_serial, end_serial
 
 
+    @profiler_decorator
     def load(self, oid, version=None):
         """Load an object for a given oid."""
         # First try from cache
@@ -508,6 +535,7 @@
             self._load_lock_release()
 
 
+    @profiler_decorator
     def loadSerial(self, oid, serial):
         """Load an object for a given oid and serial."""
         # Do not try in cache as it manages only up-to-date object
@@ -515,6 +543,7 @@
         return self._load(oid, serial=serial)[0]
 
 
+    @profiler_decorator
     def loadBefore(self, oid, tid):
         """Load an object for a given oid before tid committed."""
         # Do not try in cache as it manages only up-to-date object
@@ -527,6 +556,7 @@
             return data, start, end
 
 
+    @profiler_decorator
     def tpc_begin(self, transaction, tid=None, status=' '):
         """Begin a new transaction."""
         # First get a transaction, only one is allowed at a time
@@ -543,6 +573,7 @@
         self.local_var.txn = transaction
 
 
+    @profiler_decorator
     def store(self, oid, serial, data, version, transaction):
         """Store object."""
         if transaction is not self.local_var.txn:
@@ -586,6 +617,7 @@
         self._waitAnyMessage(False)
         return None
 
+    @profiler_decorator
     def _handleConflicts(self, tryToResolveConflict):
         result = []
         append = result.append
@@ -616,6 +648,7 @@
                     serials=(tid, serial), data=data)
         return result
 
+    @profiler_decorator
     def waitStoreResponses(self, tryToResolveConflict):
         result = []
         append = result.append
@@ -654,6 +687,7 @@
                     append((oid, tid))
         return result
 
+    @profiler_decorator
     def tpc_vote(self, transaction, tryToResolveConflict):
         """Store current transaction."""
         local_var = self.local_var
@@ -696,6 +730,7 @@
 
         return result
 
+    @profiler_decorator
     def tpc_abort(self, transaction):
         """Abort current transaction."""
         if transaction is not self.local_var.txn:
@@ -732,6 +767,7 @@
             conn.unlock()
         self.local_var.clear()
 
+    @profiler_decorator
     def tpc_finish(self, transaction, f=None):
         """Finish current transaction."""
         if self.local_var.txn is not transaction:

Modified: trunk/neo/client/pool.py
==============================================================================
--- trunk/neo/client/pool.py [iso-8859-1] (original)
+++ trunk/neo/client/pool.py [iso-8859-1] Mon Mar  8 11:31:21 2010
@@ -21,6 +21,7 @@
 from neo.protocol import NodeTypes, Packets
 from neo.connection import MTClientConnection
 from neo.client.exception import ConnectionClosed
+from neo.profiling import profiler_decorator
 
 class ConnectionPool(object):
     """This class manages a pool of connections to storage nodes."""
@@ -36,6 +37,7 @@
         self.connection_lock_acquire = l.acquire
         self.connection_lock_release = l.release
 
+    @profiler_decorator
     def _initNodeConnection(self, node):
         """Init a connection to a given storage node."""
         addr = node.getAddress()
@@ -78,6 +80,7 @@
                 logging.info('Storage node %s not ready', node)
                 return None
 
+    @profiler_decorator
     def _dropConnections(self):
         """Drop connections."""
         for node_uuid, conn in self.connection_dict.items():
@@ -95,6 +98,7 @@
             finally:
                 conn.unlock()
 
+    @profiler_decorator
     def _createNodeConnection(self, node):
         """Create a connection to a given storage node."""
         if len(self.connection_dict) > self.max_pool_size:
@@ -114,9 +118,11 @@
         conn.lock()
         return conn
 
+    @profiler_decorator
     def getConnForCell(self, cell):
         return self.getConnForNode(cell.getNode())
 
+    @profiler_decorator
     def getConnForNode(self, node):
         """Return a locked connection object to a given node
         If no connection exists, create a new one"""
@@ -136,6 +142,7 @@
         finally:
             self.connection_lock_release()
 
+    @profiler_decorator
     def removeConnection(self, node):
         """Explicitly remove connection when a node is broken."""
         self.connection_lock_acquire()

Modified: trunk/neo/connection.py
==============================================================================
--- trunk/neo/connection.py [iso-8859-1] (original)
+++ trunk/neo/connection.py [iso-8859-1] Mon Mar  8 11:31:21 2010
@@ -28,6 +28,7 @@
 from neo.logger import PACKET_LOGGER
 
 from neo import attributeTracker
+from neo.profiling import profiler_decorator
 
 PING_DELAY = 5
 PING_TIMEOUT = 5
@@ -312,6 +313,7 @@
     def getPeerId(self):
         return self.peer_id
 
+    @profiler_decorator
     def _getNextId(self):
         next_id = self.cur_id
         self.cur_id = (next_id + 1) & 0xffffffff
@@ -405,6 +407,7 @@
         else:
             handler.connectionClosed(self)
 
+    @profiler_decorator
     def _recv(self):
         """Receive data from a connector."""
         try:
@@ -430,6 +433,7 @@
             # unhandled connector exception
             raise
 
+    @profiler_decorator
     def _send(self):
         """Send data to a connector."""
         if not self.write_buf:
@@ -457,6 +461,7 @@
             self._closure()
             raise
 
+    @profiler_decorator
     def _addPacket(self, packet):
         """Add a packet into the write buffer."""
         if self.connector is None:
@@ -479,6 +484,7 @@
         self._addPacket(packet)
         return msg_id
 
+    @profiler_decorator
     @not_closed
     def ask(self, packet, timeout=CRITICAL_TIMEOUT):
         """

Modified: trunk/neo/dispatcher.py
==============================================================================
--- trunk/neo/dispatcher.py [iso-8859-1] (original)
+++ trunk/neo/dispatcher.py [iso-8859-1] Mon Mar  8 11:31:21 2010
@@ -16,6 +16,7 @@
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 
 from neo.locking import Lock
+from neo.profiling import profiler_decorator
 EMPTY = {}
 
 def giant_lock(func):
@@ -38,6 +39,7 @@
         self.lock_release = lock.release
 
     @giant_lock
+    @profiler_decorator
     def dispatch(self, conn, msg_id, data):
         """Retrieve register-time provided queue, and put data in it."""
         queue = self.message_table.get(id(conn), EMPTY).pop(msg_id, None)
@@ -48,6 +50,7 @@
         return True
 
     @giant_lock
+    @profiler_decorator
     def register(self, conn, msg_id, queue):
         """Register an expectation for a reply."""
         self.message_table.setdefault(id(conn), {})[msg_id] = queue
@@ -58,6 +61,7 @@
         except KeyError:
             queue_dict[key] = 1
 
+    @profiler_decorator
     def unregister(self, conn):
         """ Unregister a connection and put fake packet in queues to unlock
         threads excepting responses from that connection """
@@ -75,11 +79,13 @@
                 notified_set.add(queue_id)
             queue_dict[queue_id] -= 1
 
+    @profiler_decorator
     def registered(self, conn):
         """Check if a connection is registered into message table."""
         return len(self.message_table.get(id(conn), EMPTY)) != 0
 
     @giant_lock
+    @profiler_decorator
     def pending(self, queue):
         return not queue.empty() or self.queue_dict[id(queue)] > 0
 

Modified: trunk/neo/logger.py
==============================================================================
--- trunk/neo/logger.py [iso-8859-1] (original)
+++ trunk/neo/logger.py [iso-8859-1] Mon Mar  8 11:31:21 2010
@@ -19,6 +19,7 @@
 from neo.protocol import PacketMalformedError
 from neo.util import dump
 from neo.handler import EventHandler
+from neo.profiling import profiler_decorator
 
 class PacketLogger(EventHandler):
     """ Logger at packet level (for debugging purpose) """
@@ -26,6 +27,7 @@
     def __init__(self):
         EventHandler.__init__(self, None)
 
+    @profiler_decorator
     def dispatch(self, conn, packet, direction):
         """This is a helper method to handle various packet types."""
         # default log message

Modified: trunk/neo/protocol.py
==============================================================================
--- trunk/neo/protocol.py [iso-8859-1] (original)
+++ trunk/neo/protocol.py [iso-8859-1] Mon Mar  8 11:31:21 2010
@@ -17,6 +17,7 @@
 
 from struct import pack, unpack, error, calcsize
 from socket import inet_ntoa, inet_aton
+from neo.profiling import profiler_decorator
 
 from neo.util import Enum
 
@@ -263,6 +264,7 @@
     def getType(self):
         return self.__class__
 
+    @profiler_decorator
     def encode(self):
         """ Encode a packet as a string to send it over the network """
         content = self._body

Modified: trunk/tools/perfs
==============================================================================
--- trunk/tools/perfs [iso-8859-1] (original)
+++ trunk/tools/perfs [iso-8859-1] Mon Mar  8 11:31:21 2010
@@ -10,9 +10,12 @@
 from neo.tests.functional import NEOCluster
 from neo.client.Storage import Storage
 from ZODB.FileStorage import FileStorage
+from neo.profiling import PROFILING_ENABLED, profiler_decorator, \
+    profiler_report
 
 def runImport(neo, datafs):
 
+    @profiler_decorator
     def _copyTransactionsFrom(self, other):
         """ taken from ZODB.BaseStorage that build stat during import """
         def inc(d):
@@ -182,6 +185,9 @@
     summary, report = buildReport(config, *runImport(neo, datafs))
     neo.stop()
 
+    if PROFILING_ENABLED:
+        print profiler_report()
+
     # display and/or send the report
     print summary
     print report





More information about the Neo-report mailing list