[Neo-report] r1911 vincent - in /trunk: neo/ neo/client/ tools/
nobody at svn.erp5.org
nobody at svn.erp5.org
Mon Mar 8 11:31:22 CET 2010
Author: vincent
Date: Mon Mar 8 11:31:21 2010
New Revision: 1911
Log:
Add tiny_profiler measure points.
Given measure points were used to search for hot spots in client around
"store" method.
Modified:
trunk/neo/client/app.py
trunk/neo/client/pool.py
trunk/neo/connection.py
trunk/neo/dispatcher.py
trunk/neo/logger.py
trunk/neo/protocol.py
trunk/tools/perfs
Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Mon Mar 8 11:31:21 2010
@@ -17,7 +17,7 @@
from thread import get_ident
from cPickle import dumps
-from zlib import compress, decompress
+from zlib import compress as real_compress, decompress
from neo.locking import Queue, Empty
from random import shuffle
from time import sleep
@@ -32,7 +32,7 @@
from neo import protocol
from neo.protocol import NodeTypes, Packets
from neo.event import EventManager
-from neo.util import makeChecksum, dump
+from neo.util import makeChecksum as real_makeChecksum, dump
from neo.locking import Lock
from neo.connection import MTClientConnection
from neo.node import NodeManager
@@ -47,7 +47,22 @@
from neo.client.mq import MQ
from neo.client.pool import ConnectionPool
from neo.util import u64, parseMasterList
-
+from neo.profiling import profiler_decorator, PROFILING_ENABLED
+
+if PROFILING_ENABLED:
+ # Those functions require a "real" python function wrapper before they can
+ # be decorated.
+ @profiler_decorator
+ def compress(data):
+ return real_compress(data)
+
+ @profiler_decorator
+ def makeChecksum(data):
+ return real_makeChecksum(data)
+else:
+ # If profiling is disabled, directly use original functions.
+ compress = real_compress
+ makeChecksum = real_makeChecksum
class ThreadContext(object):
@@ -161,6 +176,7 @@
self._nm_acquire = lock.acquire
self._nm_release = lock.release
+ @profiler_decorator
def _handlePacket(self, conn, packet, handler=None):
"""
conn
@@ -186,6 +202,7 @@
raise ValueError, 'Unknown node type: %r' % (node.__class__, )
handler.dispatch(conn, packet)
+ @profiler_decorator
def _waitAnyMessage(self, block=True):
"""
Handle all pending packets.
@@ -209,6 +226,7 @@
except ConnectionClosed:
pass
+ @profiler_decorator
def _waitMessage(self, target_conn, msg_id, handler=None):
"""Wait for a message returned by the dispatcher in queues."""
get = self.local_var.queue.get
@@ -225,6 +243,7 @@
elif packet is not None:
self._handlePacket(conn, packet)
+ @profiler_decorator
def _askStorage(self, conn, packet):
""" Send a request to a storage node and process it's answer """
try:
@@ -234,6 +253,7 @@
conn.unlock()
self._waitMessage(conn, msg_id, self.storage_handler)
+ @profiler_decorator
def _askPrimary(self, packet):
""" Send a request to the primary master and process it's answer """
conn = self._getMasterConnection()
@@ -244,6 +264,7 @@
conn.unlock()
self._waitMessage(conn, msg_id, self.primary_handler)
+ @profiler_decorator
def _getMasterConnection(self):
""" Connect to the primary master node on demand """
# acquire the lock to allow only one thread to connect to the primary
@@ -265,6 +286,7 @@
self._getMasterConnection()
return self.pt
+ @profiler_decorator
def _getCellListForOID(self, oid, readable=False, writable=False):
""" Return the cells available for the specified OID """
pt = self._getPartitionTable()
@@ -275,6 +297,7 @@
pt = self._getPartitionTable()
return pt.getCellListForTID(tid, readable, writable)
+ @profiler_decorator
def _connectToPrimaryNode(self):
logging.debug('connecting to primary master...')
ready = False
@@ -377,6 +400,7 @@
def getDB(self):
return self._db
+ @profiler_decorator
def new_oid(self):
"""Get a new OID."""
self._oid_lock_acquire()
@@ -398,6 +422,7 @@
# return the last OID used, this is innacurate
return int(u64(self.last_oid))
+ @profiler_decorator
def getSerial(self, oid):
# Try in cache first
self._cache_lock_acquire()
@@ -415,6 +440,7 @@
return hist[1][0][0]
+ @profiler_decorator
def _load(self, oid, serial=None, tid=None, cache=0):
"""Internal method which manage load ,loadSerial and loadBefore."""
cell_list = self._getCellListForOID(oid, readable=True)
@@ -489,6 +515,7 @@
return data, start_serial, end_serial
+ @profiler_decorator
def load(self, oid, version=None):
"""Load an object for a given oid."""
# First try from cache
@@ -508,6 +535,7 @@
self._load_lock_release()
+ @profiler_decorator
def loadSerial(self, oid, serial):
"""Load an object for a given oid and serial."""
# Do not try in cache as it manages only up-to-date object
@@ -515,6 +543,7 @@
return self._load(oid, serial=serial)[0]
+ @profiler_decorator
def loadBefore(self, oid, tid):
"""Load an object for a given oid before tid committed."""
# Do not try in cache as it manages only up-to-date object
@@ -527,6 +556,7 @@
return data, start, end
+ @profiler_decorator
def tpc_begin(self, transaction, tid=None, status=' '):
"""Begin a new transaction."""
# First get a transaction, only one is allowed at a time
@@ -543,6 +573,7 @@
self.local_var.txn = transaction
+ @profiler_decorator
def store(self, oid, serial, data, version, transaction):
"""Store object."""
if transaction is not self.local_var.txn:
@@ -586,6 +617,7 @@
self._waitAnyMessage(False)
return None
+ @profiler_decorator
def _handleConflicts(self, tryToResolveConflict):
result = []
append = result.append
@@ -616,6 +648,7 @@
serials=(tid, serial), data=data)
return result
+ @profiler_decorator
def waitStoreResponses(self, tryToResolveConflict):
result = []
append = result.append
@@ -654,6 +687,7 @@
append((oid, tid))
return result
+ @profiler_decorator
def tpc_vote(self, transaction, tryToResolveConflict):
"""Store current transaction."""
local_var = self.local_var
@@ -696,6 +730,7 @@
return result
+ @profiler_decorator
def tpc_abort(self, transaction):
"""Abort current transaction."""
if transaction is not self.local_var.txn:
@@ -732,6 +767,7 @@
conn.unlock()
self.local_var.clear()
+ @profiler_decorator
def tpc_finish(self, transaction, f=None):
"""Finish current transaction."""
if self.local_var.txn is not transaction:
Modified: trunk/neo/client/pool.py
==============================================================================
--- trunk/neo/client/pool.py [iso-8859-1] (original)
+++ trunk/neo/client/pool.py [iso-8859-1] Mon Mar 8 11:31:21 2010
@@ -21,6 +21,7 @@
from neo.protocol import NodeTypes, Packets
from neo.connection import MTClientConnection
from neo.client.exception import ConnectionClosed
+from neo.profiling import profiler_decorator
class ConnectionPool(object):
"""This class manages a pool of connections to storage nodes."""
@@ -36,6 +37,7 @@
self.connection_lock_acquire = l.acquire
self.connection_lock_release = l.release
+ @profiler_decorator
def _initNodeConnection(self, node):
"""Init a connection to a given storage node."""
addr = node.getAddress()
@@ -78,6 +80,7 @@
logging.info('Storage node %s not ready', node)
return None
+ @profiler_decorator
def _dropConnections(self):
"""Drop connections."""
for node_uuid, conn in self.connection_dict.items():
@@ -95,6 +98,7 @@
finally:
conn.unlock()
+ @profiler_decorator
def _createNodeConnection(self, node):
"""Create a connection to a given storage node."""
if len(self.connection_dict) > self.max_pool_size:
@@ -114,9 +118,11 @@
conn.lock()
return conn
+ @profiler_decorator
def getConnForCell(self, cell):
return self.getConnForNode(cell.getNode())
+ @profiler_decorator
def getConnForNode(self, node):
"""Return a locked connection object to a given node
If no connection exists, create a new one"""
@@ -136,6 +142,7 @@
finally:
self.connection_lock_release()
+ @profiler_decorator
def removeConnection(self, node):
"""Explicitly remove connection when a node is broken."""
self.connection_lock_acquire()
Modified: trunk/neo/connection.py
==============================================================================
--- trunk/neo/connection.py [iso-8859-1] (original)
+++ trunk/neo/connection.py [iso-8859-1] Mon Mar 8 11:31:21 2010
@@ -28,6 +28,7 @@
from neo.logger import PACKET_LOGGER
from neo import attributeTracker
+from neo.profiling import profiler_decorator
PING_DELAY = 5
PING_TIMEOUT = 5
@@ -312,6 +313,7 @@
def getPeerId(self):
return self.peer_id
+ @profiler_decorator
def _getNextId(self):
next_id = self.cur_id
self.cur_id = (next_id + 1) & 0xffffffff
@@ -405,6 +407,7 @@
else:
handler.connectionClosed(self)
+ @profiler_decorator
def _recv(self):
"""Receive data from a connector."""
try:
@@ -430,6 +433,7 @@
# unhandled connector exception
raise
+ @profiler_decorator
def _send(self):
"""Send data to a connector."""
if not self.write_buf:
@@ -457,6 +461,7 @@
self._closure()
raise
+ @profiler_decorator
def _addPacket(self, packet):
"""Add a packet into the write buffer."""
if self.connector is None:
@@ -479,6 +484,7 @@
self._addPacket(packet)
return msg_id
+ @profiler_decorator
@not_closed
def ask(self, packet, timeout=CRITICAL_TIMEOUT):
"""
Modified: trunk/neo/dispatcher.py
==============================================================================
--- trunk/neo/dispatcher.py [iso-8859-1] (original)
+++ trunk/neo/dispatcher.py [iso-8859-1] Mon Mar 8 11:31:21 2010
@@ -16,6 +16,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from neo.locking import Lock
+from neo.profiling import profiler_decorator
EMPTY = {}
def giant_lock(func):
@@ -38,6 +39,7 @@
self.lock_release = lock.release
@giant_lock
+ @profiler_decorator
def dispatch(self, conn, msg_id, data):
"""Retrieve register-time provided queue, and put data in it."""
queue = self.message_table.get(id(conn), EMPTY).pop(msg_id, None)
@@ -48,6 +50,7 @@
return True
@giant_lock
+ @profiler_decorator
def register(self, conn, msg_id, queue):
"""Register an expectation for a reply."""
self.message_table.setdefault(id(conn), {})[msg_id] = queue
@@ -58,6 +61,7 @@
except KeyError:
queue_dict[key] = 1
+ @profiler_decorator
def unregister(self, conn):
""" Unregister a connection and put fake packet in queues to unlock
threads excepting responses from that connection """
@@ -75,11 +79,13 @@
notified_set.add(queue_id)
queue_dict[queue_id] -= 1
+ @profiler_decorator
def registered(self, conn):
"""Check if a connection is registered into message table."""
return len(self.message_table.get(id(conn), EMPTY)) != 0
@giant_lock
+ @profiler_decorator
def pending(self, queue):
return not queue.empty() or self.queue_dict[id(queue)] > 0
Modified: trunk/neo/logger.py
==============================================================================
--- trunk/neo/logger.py [iso-8859-1] (original)
+++ trunk/neo/logger.py [iso-8859-1] Mon Mar 8 11:31:21 2010
@@ -19,6 +19,7 @@
from neo.protocol import PacketMalformedError
from neo.util import dump
from neo.handler import EventHandler
+from neo.profiling import profiler_decorator
class PacketLogger(EventHandler):
""" Logger at packet level (for debugging purpose) """
@@ -26,6 +27,7 @@
def __init__(self):
EventHandler.__init__(self, None)
+ @profiler_decorator
def dispatch(self, conn, packet, direction):
"""This is a helper method to handle various packet types."""
# default log message
Modified: trunk/neo/protocol.py
==============================================================================
--- trunk/neo/protocol.py [iso-8859-1] (original)
+++ trunk/neo/protocol.py [iso-8859-1] Mon Mar 8 11:31:21 2010
@@ -17,6 +17,7 @@
from struct import pack, unpack, error, calcsize
from socket import inet_ntoa, inet_aton
+from neo.profiling import profiler_decorator
from neo.util import Enum
@@ -263,6 +264,7 @@
def getType(self):
return self.__class__
+ @profiler_decorator
def encode(self):
""" Encode a packet as a string to send it over the network """
content = self._body
Modified: trunk/tools/perfs
==============================================================================
--- trunk/tools/perfs [iso-8859-1] (original)
+++ trunk/tools/perfs [iso-8859-1] Mon Mar 8 11:31:21 2010
@@ -10,9 +10,12 @@
from neo.tests.functional import NEOCluster
from neo.client.Storage import Storage
from ZODB.FileStorage import FileStorage
+from neo.profiling import PROFILING_ENABLED, profiler_decorator, \
+ profiler_report
def runImport(neo, datafs):
+ @profiler_decorator
def _copyTransactionsFrom(self, other):
""" taken from ZODB.BaseStorage that build stat during import """
def inc(d):
@@ -182,6 +185,9 @@
summary, report = buildReport(config, *runImport(neo, datafs))
neo.stop()
+ if PROFILING_ENABLED:
+ print profiler_report()
+
# display and/or send the report
print summary
print report
More information about the Neo-report
mailing list