[Neo-report] r2778 jm - in /trunk: ./ neo/tests/ neo/tests/threaded/ tools/
nobody at svn.erp5.org
nobody at svn.erp5.org
Tue Jun 14 12:04:32 CEST 2011
Author: jm
Date: Tue Jun 14 12:04:32 2011
New Revision: 2778
Log:
Add tool to synthesize a ZODB for benchmark
Added:
trunk/neo/tests/stat_zodb.py (with props)
Modified:
trunk/neo/tests/benchmark.py
trunk/neo/tests/threaded/__init__.py
trunk/neo/tests/threaded/test.py
trunk/setup.py
trunk/tools/matrix
trunk/tools/perfs
Modified: trunk/neo/tests/benchmark.py
==============================================================================
--- trunk/neo/tests/benchmark.py [iso-8859-1] (original)
+++ trunk/neo/tests/benchmark.py [iso-8859-1] Tue Jun 14 12:04:32 2011
@@ -8,7 +8,7 @@ import datetime
from email.MIMEMultipart import MIMEMultipart
from email.MIMEText import MIMEText
-from neo.tests.functional import NEOCluster
+from neo.lib import logger
MAIL_SERVER = '127.0.0.1:25'
@@ -41,13 +41,13 @@ class BenchmarkRunner(object):
# check specifics arguments
self._config = AttributeDict()
self._config.update(self.load_options(options, self._args))
- self._config.update(dict(
+ self._config.update(
title = options.title or self.__class__.__name__,
- verbose = options.verbose,
+ verbose = bool(options.verbose),
mail_from = options.mail_from,
mail_to = options.mail_to,
mail_server = mail_server.split(':'),
- ))
+ )
def add_status(self, key, value):
self._status.append((key, value))
@@ -91,6 +91,7 @@ class BenchmarkRunner(object):
s.close()
def run(self):
+ logger.PACKET_LOGGER.enable(self._config.verbose)
subject, report = self.start()
report = self.build_report(report)
if self._config.mail_to:
Added: trunk/neo/tests/stat_zodb.py
==============================================================================
--- trunk/neo/tests/stat_zodb.py (added)
+++ trunk/neo/tests/stat_zodb.py [iso-8859-1] Tue Jun 14 12:04:32 2011
@@ -0,0 +1,151 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import math, os, random, sys
+from cStringIO import StringIO
+from ZODB.utils import p64
+from ZODB.BaseStorage import TransactionRecord
+from ZODB.FileStorage import FileStorage
+
+# Stats of a 43.5 GB production Data.fs
+# µ Ï
+# size of object 6.04237779991 1.55811487853
+# # objects / transaction 1.04108991045 0.906703192546
+# size of transaction 7.98615420517 1.6624220402
+#
+# % of new object / transaction: 0.810080409164
+# # of transactions: 1541194
+# compression ratio: 28.5 % (gzip -6)
+PROD1 = lambda random=random: DummyZODB(6.04237779991, 1.55811487853,
+ 1.04108991045, 0.906703192546,
+ 0.810080409164, random)
+
+def DummyData(random=random):
+ # returns data that gzip at about 28.5 %
+ # make sure sample is bigger than dictionary of compressor
+ data = ''.join(chr(int(random.gauss(0, .8)) % 256) for x in xrange(100000))
+ return StringIO(data)
+
+
+class DummyZODB(object):
+ """
+ Object size and count of generated transaction follows a log normal
+ distribution, where *_mu and *_sigma are their parameters.
+ """
+
+ def __init__(self, obj_size_mu, obj_size_sigma,
+ obj_count_mu, obj_count_sigma,
+ new_ratio, random=random):
+ self.obj_size_mu = obj_size_mu
+ self.obj_size_sigma = obj_size_sigma
+ self.obj_count_mu = obj_count_mu
+ self.obj_count_sigma = obj_count_sigma
+ self.random = random
+ self.new_ratio = new_ratio
+ self.next_oid = 0
+ self.err_count = 0
+
+ def __call__(self):
+ variate = self.random.lognormvariate
+ oid_set = set()
+ for i in xrange(int(round(variate(self.obj_count_mu,
+ self.obj_count_sigma))) or 1):
+ if len(oid_set) >= self.next_oid or \
+ self.random.random() < self.new_ratio:
+ oid = self.next_oid
+ self.next_oid = oid + 1
+ else:
+ while True:
+ oid = self.random.randrange(self.next_oid)
+ if oid not in oid_set:
+ break
+ oid_set.add(oid)
+ yield p64(oid), int(round(variate(self.obj_size_mu,
+ self.obj_size_sigma))) or 1
+
+ def as_storage(self, transaction_count, dummy_data_file=None):
+ if dummy_data_file is None:
+ dummy_data_file = DummyData(self.random)
+ class dummy_change(object):
+ data_txn = None
+ version = ''
+ def __init__(self, tid, oid, size):
+ self.tid = tid
+ self.oid = oid
+ data = ''
+ while size:
+ d = dummy_data_file.read(size)
+ size -= len(d)
+ data += d
+ if size:
+ dummy_data_file.seek(0)
+ self.data = data
+ class dummy_transaction(TransactionRecord):
+ def __init__(transaction, *args):
+ TransactionRecord.__init__(transaction, *args)
+ transaction_size = 0
+ transaction.record_list = []
+ add_record = transaction.record_list.append
+ for x in self():
+ oid, size = x
+ transaction_size += size
+ add_record(dummy_change(transaction.tid, oid, size))
+ transaction.size = transaction_size
+ def __iter__(transaction):
+ return iter(transaction.record_list)
+ class dummy_storage(object):
+ size = 0
+ def iterator(storage, *args):
+ args = ' ', '', '', {}
+ for i in xrange(1, transaction_count+1):
+ t = dummy_transaction(p64(i), *args)
+ storage.size += t.size
+ yield t
+ def getSize(self):
+ return self.size
+ return dummy_storage()
+
+def lognorm_stat(X):
+ Y = map(math.log, X)
+ n = len(Y)
+ mu = sum(Y) / n
+ s2 = sum(d*d for d in (y - mu for y in Y)) / n
+ return mu, math.sqrt(s2)
+
+def stat(*storages):
+ obj_size_list = []
+ obj_count_list = []
+ tr_size_list = []
+ oid_set = set()
+ for storage in storages:
+ for transaction in storage.iterator():
+ obj_count = tr_size = 0
+ for r in transaction:
+ if r.data:
+ obj_count += 1
+ oid = r.oid
+ if oid not in oid_set:
+ oid_set.add(oid)
+ size = len(r.data)
+ tr_size += size
+ obj_size_list.append(size)
+ obj_count_list.append(obj_count)
+ tr_size_list.append(tr_size)
+ new_ratio = float(len(oid_set)) / len(obj_size_list)
+ return (lognorm_stat(obj_size_list),
+ lognorm_stat(obj_count_list),
+ lognorm_stat(tr_size_list),
+ new_ratio, len(tr_size_list))
+
+def main():
+ s = stat(*(FileStorage(x, read_only=True) for x in sys.argv[1:]))
+ print(u" %-15s Ï\n"
+ "size of object %-15s %s\n"
+ "# objects / transaction %-15s %s\n"
+ "size of transaction %-15s %s\n"
+ "\n%% of new object / transaction: %s"
+ "\n# of transactions: %s"
+ % ((u"µ",) + s[0] + s[1] + s[2] + s[3:]))
+
+
+if __name__ == "__main__":
+ sys.exit(main())
Propchange: trunk/neo/tests/stat_zodb.py
------------------------------------------------------------------------------
svn:executable = *
Modified: trunk/neo/tests/threaded/__init__.py
==============================================================================
--- trunk/neo/tests/threaded/__init__.py [iso-8859-1] (original)
+++ trunk/neo/tests/threaded/__init__.py [iso-8859-1] Tue Jun 14 12:04:32 2011
@@ -16,7 +16,7 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-import os, random, socket, sys, threading, time, types
+import os, random, socket, sys, tempfile, threading, time, types
from collections import deque
from functools import wraps
from Queue import Queue, Empty
@@ -270,6 +270,20 @@ class NeoCTL(neo.neoctl.app.NeoCTL):
lambda self, address: setattr(self, '_server', address))
+class LoggerThreadName(object):
+
+ def __init__(self, default='TEST'):
+ self.__default = default
+
+ def __getattr__(self, attr):
+ return getattr(str(self), attr)
+
+ def __str__(self):
+ try:
+ return threading.currentThread().node_name
+ except AttributeError:
+ return self.__default
+
class NEOCluster(object):
BaseConnection_checkTimeout = staticmethod(BaseConnection.checkTimeout)
@@ -326,14 +340,21 @@ class NEOCluster(object):
def __init__(self, master_count=1, partitions=1, replicas=0,
adapter=os.getenv('NEO_TESTS_ADAPTER', 'BTree'),
- storage_count=None, db_list=None,
- db_user='neo', db_password='neo'):
+ storage_count=None, db_list=None, clear_databases=True,
+ db_user='neo', db_password='neo', verbose=None):
+ if verbose is not None:
+ temp_dir = os.getenv('TEMP') or \
+ os.path.join(tempfile.gettempdir(), 'neo_tests')
+ os.path.exists(temp_dir) or os.makedirs(temp_dir)
+ log_file = tempfile.mkstemp('.log', '', temp_dir)[1]
+ print 'Logging to %r' % log_file
+ setupLog(LoggerThreadName(), log_file, verbose)
self.name = 'neo_%s' % random.randint(0, 100)
ip = getVirtualIp('master')
self.master_nodes = ' '.join('%s:%s' % (ip, i)
for i in xrange(master_count))
kw = dict(cluster=self, getReplicas=replicas, getPartitions=partitions,
- getAdapter=adapter, getReset=True)
+ getAdapter=adapter, getReset=clear_databases)
self.master_list = [MasterApplication(address=(ip, i), **kw)
for i in xrange(master_count)]
ip = getVirtualIp('storage')
@@ -383,7 +404,7 @@ class NEOCluster(object):
self.client = ClientApplication(self)
self.neoctl = NeoCTL(self)
- def start(self, client=False, storage_list=None, fast_startup=True):
+ def start(self, storage_list=None, fast_startup=True):
self.__class__._cluster = weak_ref(self)
for node_type in 'master', 'admin':
for node in getattr(self, node_type + '_list'):
@@ -401,8 +422,6 @@ class NEOCluster(object):
self.tic()
assert self.neoctl.getClusterState() == ClusterStates.RUNNING
self.enableStorageList(storage_list)
- if client:
- self.startClient()
def enableStorageList(self, storage_list):
self.neoctl.enableStorageList([x.uuid for x in storage_list])
@@ -410,13 +429,16 @@ class NEOCluster(object):
for node in storage_list:
assert self.getNodeState(node) == NodeStates.RUNNING
- def startClient(self):
- self.client.setPoll(True)
- self.db = ZODB.DB(storage=self.getZODBStorage())
+ @property
+ def db(self):
+ try:
+ return self._db
+ except AttributeError:
+ self._db = db = ZODB.DB(storage=self.getZODBStorage())
+ return db
def stop(self):
- if hasattr(self, 'db'):
- self.db.close()
+ getattr(self, '_db', self.client).close()
#self.neoctl.setClusterState(ClusterStates.STOPPING) # TODO
try:
Serialized.release(stop=1)
@@ -446,6 +468,9 @@ class NEOCluster(object):
if cell[1] == CellStates.OUT_OF_DATE]
def getZODBStorage(self, **kw):
+ # automatically put client in master mode
+ if self.client.em._timeout == 0:
+ self.client.setPoll(True)
return Storage.Storage(None, self.name, _app=self.client, **kw)
def getTransaction(self):
@@ -453,17 +478,6 @@ class NEOCluster(object):
return txn, self.db.open(txn)
-class LoggerThreadName(object):
-
- def __getattr__(self, attr):
- return getattr(str(self), attr)
-
- def __str__(self):
- try:
- return threading.currentThread().node_name
- except AttributeError:
- return 'TEST'
-
class NEOThreadedTest(NeoUnitTestBase):
def setupLog(self):
Modified: trunk/neo/tests/threaded/test.py
==============================================================================
--- trunk/neo/tests/threaded/test.py [iso-8859-1] (original)
+++ trunk/neo/tests/threaded/test.py [iso-8859-1] Tue Jun 14 12:04:32 2011
@@ -29,7 +29,7 @@ class Test(NEOThreadedTest):
def test_commit(self):
cluster = NEOCluster()
- cluster.start(1)
+ cluster.start()
try:
t, c = cluster.getTransaction()
c.root()['foo'] = PObject()
@@ -42,7 +42,8 @@ class Test(NEOThreadedTest):
# (neo.tests.client.testMasterHandler)
cluster = NEOCluster()
try:
- cluster.start(1)
+ cluster.start()
+ cluster.db # open DB
cluster.client.setPoll(0)
storage, = cluster.client.nm.getStorageList()
conn = storage.getConnection()
Modified: trunk/setup.py
==============================================================================
--- trunk/setup.py [iso-8859-1] (original)
+++ trunk/setup.py [iso-8859-1] Tue Jun 14 12:04:32 2011
@@ -42,6 +42,7 @@ setup(
'neostorage=neo.scripts.neostorage:main',
'neotestrunner=neo.scripts.runner:main',
'neosimple=neo.scripts.simple:main',
+ 'stat_zodb=neo.tests.stat_zodb:main',
],
},
# Raah!!! I wish I could write something like:
Modified: trunk/tools/matrix
==============================================================================
--- trunk/tools/matrix [iso-8859-1] (original)
+++ trunk/tools/matrix [iso-8859-1] Tue Jun 14 12:04:32 2011
@@ -7,7 +7,6 @@ import traceback
from time import time
from neo.tests.benchmark import BenchmarkRunner
-from neo.tests.functional import NEOCluster
from ZODB.FileStorage import FileStorage
MIN_STORAGES = 1
@@ -25,9 +24,10 @@ class MatrixImportBenchmark(BenchmarkRun
parser.add_option('', '--max-storages')
parser.add_option('', '--min-replicas')
parser.add_option('', '--max-replicas')
+ parser.add_option('', '--threaded', action="store_true")
def load_options(self, options, args):
- if not options.datafs or not os.path.exists(options.datafs):
+ if options.datafs and not os.path.exists(options.datafs):
sys.exit('Missing or wrong data.fs argument')
return dict(
datafs = options.datafs,
@@ -35,6 +35,7 @@ class MatrixImportBenchmark(BenchmarkRun
max_s = int(options.max_storages or MAX_STORAGES),
min_r = int(options.min_replicas or MIN_REPLICAS),
max_r = int(options.max_replicas or MAX_REPLICAS),
+ threaded = options.threaded,
)
def start(self):
@@ -49,42 +50,59 @@ class MatrixImportBenchmark(BenchmarkRun
if storages[-1] < max_s:
storages.append(max_s)
replicas = range(min_r, max_r + 1)
- results = self.runMatrix(storages, replicas)
+ if self._config.threaded:
+ from neo.tests.threaded import NEOCluster
+ NEOCluster.patch() # XXX ugly
+ try:
+ results = self.runMatrix(storages, replicas)
+ finally:
+ if self._config.threaded:
+ from neo.tests.threaded import NEOCluster
+ NEOCluster.unpatch()# XXX ugly
return self.buildReport(storages, replicas, results)
def runMatrix(self, storages, replicas):
stats = {}
- size = float(os.path.getsize(self._config.datafs))
for s in storages:
for r in [r for r in replicas if r < s]:
stats.setdefault(s, {})
- result = self.runImport(1, s, r, 100)
- if result is not None:
- result = size / result / 1024
- stats[s][r] = result
+ stats[s][r] = self.runImport(1, s, r, 100)
return stats
def runImport(self, masters, storages, replicas, partitions):
+ datafs = self._config.datafs
+ if datafs:
+ dfs_storage = FileStorage(file_name=self._config.datafs)
+ else:
+ datafs = 'PROD1'
+ import random, neo.tests.stat_zodb
+ dfs_storage = getattr(neo.tests.stat_zodb, datafs)(
+ random.Random(0)).as_storage(100)
print "Import of %s with m=%s, s=%s, r=%s, p=%s" % (
- self._config.datafs, masters, storages, replicas, partitions)
+ datafs, masters, storages, replicas, partitions)
# cluster
- neo = NEOCluster(
+ kw = dict(
db_list=['neot_matrix_%d' % i for i in xrange(storages)],
clear_databases=True,
partitions=partitions,
replicas=replicas,
- master_node_count=masters,
verbose=self._config.verbose,
)
- # import
- neo_storage = neo.getZODBStorage()
- dfs_storage = FileStorage(file_name=self._config.datafs)
+ if self._config.threaded:
+ from neo.tests.threaded import NEOCluster
+ neo = NEOCluster(master_count=masters, **kw)
+ else:
+ from neo.tests.functional import NEOCluster
+ neo = NEOCluster(master_node_count=masters, **kw)
neo.start()
+ neo_storage = neo.getZODBStorage()
+ # import
start = time()
try:
try:
neo_storage.copyTransactionsFrom(dfs_storage)
- return time() - start
+ end = time()
+ return dfs_storage.getSize() / ((end - start) * 1e3)
except:
traceback.print_exc()
self.error_log += "Import with m=%s, s=%s, r=%s, p=%s:" % (
Modified: trunk/tools/perfs
==============================================================================
--- trunk/tools/perfs [iso-8859-1] (original)
+++ trunk/tools/perfs [iso-8859-1] Tue Jun 14 12:04:32 2011
@@ -24,7 +24,7 @@ class ImportBenchmark(BenchmarkRunner):
parser.add_option('-r', '--replicas')
def load_options(self, options, args):
- if not options.datafs or not os.path.exists(options.datafs):
+ if options.datafs and not os.path.exists(options.datafs):
sys.exit('Missing or wrong data.fs argument')
return dict(
datafs = options.datafs,
@@ -74,8 +74,12 @@ class ImportBenchmark(BenchmarkRunner):
# open storages clients
datafs = self._config.datafs
neo_storage = neo.getZODBStorage()
- dfs_storage = FileStorage(file_name=datafs)
- dfs_size = os.path.getsize(datafs)
+ if datafs:
+ dfs_storage = FileStorage(file_name=datafs)
+ else:
+ from neo.tests.stat_zodb import PROD1
+ from random import Random
+ dfs_storage = PROD1(Random(0)).as_storage(10000)
# monkey patch storage
txn_dict, obj_dict = {}, {}
@@ -92,13 +96,13 @@ class ImportBenchmark(BenchmarkRunner):
'Transactions': txn_dict.values(),
'Objects': obj_dict.values(),
}
- return (dfs_size, elapsed, stats)
+ return (dfs_storage.getSize(), elapsed, stats)
def buildReport(self, dfs_size, elapsed, stats):
""" build a report for the given import data """
config = self._config
- dfs_size /= 1024
- size = dfs_size / 1024
+ dfs_size /= 1e3
+ size = dfs_size / 1e3
speed = dfs_size / elapsed
# configuration
More information about the Neo-report
mailing list