[Neo-report] r2778 jm - in /trunk: ./ neo/tests/ neo/tests/threaded/ tools/

nobody at svn.erp5.org nobody at svn.erp5.org
Tue Jun 14 12:04:32 CEST 2011


Author: jm
Date: Tue Jun 14 12:04:32 2011
New Revision: 2778

Log:
Add tool to synthesize a ZODB for benchmark

Added:
    trunk/neo/tests/stat_zodb.py   (with props)
Modified:
    trunk/neo/tests/benchmark.py
    trunk/neo/tests/threaded/__init__.py
    trunk/neo/tests/threaded/test.py
    trunk/setup.py
    trunk/tools/matrix
    trunk/tools/perfs

Modified: trunk/neo/tests/benchmark.py
==============================================================================
--- trunk/neo/tests/benchmark.py [iso-8859-1] (original)
+++ trunk/neo/tests/benchmark.py [iso-8859-1] Tue Jun 14 12:04:32 2011
@@ -8,7 +8,7 @@ import datetime
 from email.MIMEMultipart import MIMEMultipart
 from email.MIMEText import MIMEText
 
-from neo.tests.functional import NEOCluster
+from neo.lib import logger
 
 MAIL_SERVER = '127.0.0.1:25'
 
@@ -41,13 +41,13 @@ class BenchmarkRunner(object):
         # check specifics arguments
         self._config = AttributeDict()
         self._config.update(self.load_options(options, self._args))
-        self._config.update(dict(
+        self._config.update(
             title = options.title or self.__class__.__name__,
-            verbose = options.verbose,
+            verbose = bool(options.verbose),
             mail_from = options.mail_from,
             mail_to = options.mail_to,
             mail_server = mail_server.split(':'),
-        ))
+        )
 
     def add_status(self, key, value):
         self._status.append((key, value))
@@ -91,6 +91,7 @@ class BenchmarkRunner(object):
         s.close()
 
     def run(self):
+        logger.PACKET_LOGGER.enable(self._config.verbose)
         subject, report = self.start()
         report = self.build_report(report)
         if self._config.mail_to:

Added: trunk/neo/tests/stat_zodb.py
==============================================================================
--- trunk/neo/tests/stat_zodb.py (added)
+++ trunk/neo/tests/stat_zodb.py [iso-8859-1] Tue Jun 14 12:04:32 2011
@@ -0,0 +1,151 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import math, os, random, sys
+from cStringIO import StringIO
+from ZODB.utils import p64
+from ZODB.BaseStorage import TransactionRecord
+from ZODB.FileStorage import FileStorage
+
+# Stats of a 43.5 GB production Data.fs
+#                          µ               σ
+# size of object           6.04237779991   1.55811487853
+# # objects / transaction  1.04108991045   0.906703192546
+# size of transaction      7.98615420517   1.6624220402
+#
+# % of new object / transaction: 0.810080409164
+# # of transactions: 1541194
+# compression ratio: 28.5 % (gzip -6)
+PROD1 = lambda random=random: DummyZODB(6.04237779991, 1.55811487853,
+                                        1.04108991045, 0.906703192546,
+                                        0.810080409164, random)
+
+def DummyData(random=random):
+    # returns data that gzip at about 28.5 %
+    # make sure sample is bigger than dictionary of compressor
+    data = ''.join(chr(int(random.gauss(0, .8)) % 256) for x in xrange(100000))
+    return StringIO(data)
+
+
+class DummyZODB(object):
+    """
+    Object size and count of generated transaction follows a log normal
+    distribution, where *_mu and *_sigma are their parameters.
+    """
+
+    def __init__(self, obj_size_mu, obj_size_sigma,
+                       obj_count_mu, obj_count_sigma,
+                       new_ratio, random=random):
+        self.obj_size_mu = obj_size_mu
+        self.obj_size_sigma = obj_size_sigma
+        self.obj_count_mu = obj_count_mu
+        self.obj_count_sigma = obj_count_sigma
+        self.random = random
+        self.new_ratio = new_ratio
+        self.next_oid = 0
+        self.err_count = 0
+
+    def __call__(self):
+        variate = self.random.lognormvariate
+        oid_set = set()
+        for i in xrange(int(round(variate(self.obj_count_mu,
+                                          self.obj_count_sigma))) or 1):
+            if len(oid_set) >= self.next_oid or \
+               self.random.random() < self.new_ratio:
+                oid = self.next_oid
+                self.next_oid = oid + 1
+            else:
+                while True:
+                    oid = self.random.randrange(self.next_oid)
+                    if oid not in oid_set:
+                        break
+            oid_set.add(oid)
+            yield p64(oid), int(round(variate(self.obj_size_mu,
+                                              self.obj_size_sigma))) or 1
+
+    def as_storage(self, transaction_count, dummy_data_file=None):
+        if dummy_data_file is None:
+            dummy_data_file = DummyData(self.random)
+        class dummy_change(object):
+            data_txn = None
+            version = ''
+            def __init__(self, tid, oid, size):
+                self.tid = tid
+                self.oid = oid
+                data = ''
+                while size:
+                    d = dummy_data_file.read(size)
+                    size -= len(d)
+                    data += d
+                    if size:
+                        dummy_data_file.seek(0)
+                self.data = data
+        class dummy_transaction(TransactionRecord):
+            def __init__(transaction, *args):
+                TransactionRecord.__init__(transaction, *args)
+                transaction_size = 0
+                transaction.record_list = []
+                add_record = transaction.record_list.append
+                for x in self():
+                    oid, size = x
+                    transaction_size += size
+                    add_record(dummy_change(transaction.tid, oid, size))
+                transaction.size = transaction_size
+            def __iter__(transaction):
+                return iter(transaction.record_list)
+        class dummy_storage(object):
+            size = 0
+            def iterator(storage, *args):
+                args = ' ', '', '', {}
+                for i in xrange(1, transaction_count+1):
+                    t =  dummy_transaction(p64(i), *args)
+                    storage.size += t.size
+                    yield t
+            def getSize(self):
+                return self.size
+        return dummy_storage()
+
+def lognorm_stat(X):
+    Y = map(math.log, X)
+    n = len(Y)
+    mu = sum(Y) / n
+    s2 = sum(d*d for d in (y - mu for y in Y)) / n
+    return mu, math.sqrt(s2)
+
+def stat(*storages):
+    obj_size_list = []
+    obj_count_list = []
+    tr_size_list = []
+    oid_set = set()
+    for storage in storages:
+        for transaction in storage.iterator():
+            obj_count = tr_size = 0
+            for r in transaction:
+                if r.data:
+                    obj_count += 1
+                    oid = r.oid
+                    if oid not in oid_set:
+                        oid_set.add(oid)
+                    size = len(r.data)
+                    tr_size += size
+                    obj_size_list.append(size)
+            obj_count_list.append(obj_count)
+            tr_size_list.append(tr_size)
+    new_ratio = float(len(oid_set)) / len(obj_size_list)
+    return (lognorm_stat(obj_size_list),
+            lognorm_stat(obj_count_list),
+            lognorm_stat(tr_size_list),
+            new_ratio, len(tr_size_list))
+
+def main():
+    s = stat(*(FileStorage(x, read_only=True) for x in sys.argv[1:]))
+    print(u"                         %-15s σ\n"
+           "size of object           %-15s %s\n"
+           "# objects / transaction  %-15s %s\n"
+           "size of transaction      %-15s %s\n"
+           "\n%% of new object / transaction: %s"
+           "\n# of transactions: %s"
+           % ((u"µ",) + s[0] + s[1] + s[2] + s[3:]))
+
+
+if __name__ == "__main__":
+    sys.exit(main())

Propchange: trunk/neo/tests/stat_zodb.py
------------------------------------------------------------------------------
    svn:executable = *

Modified: trunk/neo/tests/threaded/__init__.py
==============================================================================
--- trunk/neo/tests/threaded/__init__.py [iso-8859-1] (original)
+++ trunk/neo/tests/threaded/__init__.py [iso-8859-1] Tue Jun 14 12:04:32 2011
@@ -16,7 +16,7 @@
 # along with this program; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 
-import os, random, socket, sys, threading, time, types
+import os, random, socket, sys, tempfile, threading, time, types
 from collections import deque
 from functools import wraps
 from Queue import Queue, Empty
@@ -270,6 +270,20 @@ class NeoCTL(neo.neoctl.app.NeoCTL):
                       lambda self, address: setattr(self, '_server', address))
 
 
+class LoggerThreadName(object):
+
+    def __init__(self, default='TEST'):
+        self.__default = default
+
+    def __getattr__(self, attr):
+        return getattr(str(self), attr)
+
+    def __str__(self):
+        try:
+            return threading.currentThread().node_name
+        except AttributeError:
+            return self.__default
+
 class NEOCluster(object):
 
     BaseConnection_checkTimeout = staticmethod(BaseConnection.checkTimeout)
@@ -326,14 +340,21 @@ class NEOCluster(object):
 
     def __init__(self, master_count=1, partitions=1, replicas=0,
                        adapter=os.getenv('NEO_TESTS_ADAPTER', 'BTree'),
-                       storage_count=None, db_list=None,
-                       db_user='neo', db_password='neo'):
+                       storage_count=None, db_list=None, clear_databases=True,
+                       db_user='neo', db_password='neo', verbose=None):
+        if verbose is not None:
+            temp_dir = os.getenv('TEMP') or \
+                os.path.join(tempfile.gettempdir(), 'neo_tests')
+            os.path.exists(temp_dir) or os.makedirs(temp_dir)
+            log_file = tempfile.mkstemp('.log', '', temp_dir)[1]
+            print 'Logging to %r' % log_file
+            setupLog(LoggerThreadName(), log_file, verbose)
         self.name = 'neo_%s' % random.randint(0, 100)
         ip = getVirtualIp('master')
         self.master_nodes = ' '.join('%s:%s' % (ip, i)
                                      for i in xrange(master_count))
         kw = dict(cluster=self, getReplicas=replicas, getPartitions=partitions,
-                  getAdapter=adapter, getReset=True)
+                  getAdapter=adapter, getReset=clear_databases)
         self.master_list = [MasterApplication(address=(ip, i), **kw)
                             for i in xrange(master_count)]
         ip = getVirtualIp('storage')
@@ -383,7 +404,7 @@ class NEOCluster(object):
         self.client = ClientApplication(self)
         self.neoctl = NeoCTL(self)
 
-    def start(self, client=False, storage_list=None, fast_startup=True):
+    def start(self, storage_list=None, fast_startup=True):
         self.__class__._cluster = weak_ref(self)
         for node_type in 'master', 'admin':
             for node in getattr(self, node_type + '_list'):
@@ -401,8 +422,6 @@ class NEOCluster(object):
             self.tic()
         assert self.neoctl.getClusterState() == ClusterStates.RUNNING
         self.enableStorageList(storage_list)
-        if client:
-            self.startClient()
 
     def enableStorageList(self, storage_list):
         self.neoctl.enableStorageList([x.uuid for x in storage_list])
@@ -410,13 +429,16 @@ class NEOCluster(object):
         for node in storage_list:
             assert self.getNodeState(node) == NodeStates.RUNNING
 
-    def startClient(self):
-        self.client.setPoll(True)
-        self.db = ZODB.DB(storage=self.getZODBStorage())
+    @property
+    def db(self):
+        try:
+            return self._db
+        except AttributeError:
+            self._db = db = ZODB.DB(storage=self.getZODBStorage())
+            return db
 
     def stop(self):
-        if hasattr(self, 'db'):
-            self.db.close()
+        getattr(self, '_db', self.client).close()
         #self.neoctl.setClusterState(ClusterStates.STOPPING) # TODO
         try:
             Serialized.release(stop=1)
@@ -446,6 +468,9 @@ class NEOCluster(object):
                      if cell[1] == CellStates.OUT_OF_DATE]
 
     def getZODBStorage(self, **kw):
+        # automatically put client in master mode
+        if self.client.em._timeout == 0:
+            self.client.setPoll(True)
         return Storage.Storage(None, self.name, _app=self.client, **kw)
 
     def getTransaction(self):
@@ -453,17 +478,6 @@ class NEOCluster(object):
         return txn, self.db.open(txn)
 
 
-class LoggerThreadName(object):
-
-    def __getattr__(self, attr):
-        return getattr(str(self), attr)
-
-    def __str__(self):
-        try:
-            return threading.currentThread().node_name
-        except AttributeError:
-            return 'TEST'
-
 class NEOThreadedTest(NeoUnitTestBase):
 
     def setupLog(self):

Modified: trunk/neo/tests/threaded/test.py
==============================================================================
--- trunk/neo/tests/threaded/test.py [iso-8859-1] (original)
+++ trunk/neo/tests/threaded/test.py [iso-8859-1] Tue Jun 14 12:04:32 2011
@@ -29,7 +29,7 @@ class Test(NEOThreadedTest):
 
     def test_commit(self):
         cluster = NEOCluster()
-        cluster.start(1)
+        cluster.start()
         try:
             t, c = cluster.getTransaction()
             c.root()['foo'] = PObject()
@@ -42,7 +42,8 @@ class Test(NEOThreadedTest):
         # (neo.tests.client.testMasterHandler)
         cluster = NEOCluster()
         try:
-            cluster.start(1)
+            cluster.start()
+            cluster.db # open DB
             cluster.client.setPoll(0)
             storage, = cluster.client.nm.getStorageList()
             conn = storage.getConnection()

Modified: trunk/setup.py
==============================================================================
--- trunk/setup.py [iso-8859-1] (original)
+++ trunk/setup.py [iso-8859-1] Tue Jun 14 12:04:32 2011
@@ -42,6 +42,7 @@ setup(
             'neostorage=neo.scripts.neostorage:main',
             'neotestrunner=neo.scripts.runner:main',
             'neosimple=neo.scripts.simple:main',
+            'stat_zodb=neo.tests.stat_zodb:main',
         ],
     },
     # Raah!!! I wish I could write something like:

Modified: trunk/tools/matrix
==============================================================================
--- trunk/tools/matrix [iso-8859-1] (original)
+++ trunk/tools/matrix [iso-8859-1] Tue Jun 14 12:04:32 2011
@@ -7,7 +7,6 @@ import traceback
 from time import time
 
 from neo.tests.benchmark import BenchmarkRunner
-from neo.tests.functional import NEOCluster
 from ZODB.FileStorage import FileStorage
 
 MIN_STORAGES = 1
@@ -25,9 +24,10 @@ class MatrixImportBenchmark(BenchmarkRun
         parser.add_option('', '--max-storages')
         parser.add_option('', '--min-replicas')
         parser.add_option('', '--max-replicas')
+        parser.add_option('', '--threaded', action="store_true")
 
     def load_options(self, options, args):
-        if not options.datafs or not os.path.exists(options.datafs):
+        if options.datafs and not os.path.exists(options.datafs):
             sys.exit('Missing or wrong data.fs argument')
         return dict(
             datafs = options.datafs,
@@ -35,6 +35,7 @@ class MatrixImportBenchmark(BenchmarkRun
             max_s = int(options.max_storages or MAX_STORAGES),
             min_r = int(options.min_replicas or MIN_REPLICAS),
             max_r = int(options.max_replicas or MAX_REPLICAS),
+            threaded = options.threaded,
         )
 
     def start(self):
@@ -49,42 +50,59 @@ class MatrixImportBenchmark(BenchmarkRun
         if storages[-1] < max_s:
             storages.append(max_s)
         replicas = range(min_r, max_r + 1)
-        results = self.runMatrix(storages, replicas)
+        if self._config.threaded:
+            from neo.tests.threaded import NEOCluster
+            NEOCluster.patch() # XXX ugly
+        try:
+            results = self.runMatrix(storages, replicas)
+        finally:
+            if self._config.threaded:
+                from neo.tests.threaded import NEOCluster
+                NEOCluster.unpatch()# XXX ugly
         return self.buildReport(storages, replicas, results)
 
     def runMatrix(self, storages, replicas):
         stats = {}
-        size = float(os.path.getsize(self._config.datafs))
         for s in storages:
             for r in [r for r in replicas if r < s]:
                 stats.setdefault(s, {})
-                result = self.runImport(1, s, r, 100)
-                if result is not None:
-                    result = size / result / 1024
-                stats[s][r] = result
+                stats[s][r] = self.runImport(1, s, r, 100)
         return stats
 
     def runImport(self, masters, storages, replicas, partitions):
+        datafs = self._config.datafs
+        if datafs:
+            dfs_storage = FileStorage(file_name=self._config.datafs)
+        else:
+            datafs = 'PROD1'
+            import random, neo.tests.stat_zodb
+            dfs_storage = getattr(neo.tests.stat_zodb, datafs)(
+                random.Random(0)).as_storage(100)
         print "Import of %s with m=%s, s=%s, r=%s, p=%s" % (
-                self._config.datafs, masters, storages, replicas, partitions)
+                datafs, masters, storages, replicas, partitions)
         # cluster
-        neo = NEOCluster(
+        kw = dict(
             db_list=['neot_matrix_%d' % i for i in xrange(storages)],
             clear_databases=True,
             partitions=partitions,
             replicas=replicas,
-            master_node_count=masters,
             verbose=self._config.verbose,
         )
-        # import
-        neo_storage = neo.getZODBStorage()
-        dfs_storage = FileStorage(file_name=self._config.datafs)
+        if self._config.threaded:
+            from neo.tests.threaded import NEOCluster
+            neo = NEOCluster(master_count=masters, **kw)
+        else:
+            from neo.tests.functional import NEOCluster
+            neo = NEOCluster(master_node_count=masters, **kw)
         neo.start()
+        neo_storage = neo.getZODBStorage()
+        # import
         start = time()
         try:
             try:
                 neo_storage.copyTransactionsFrom(dfs_storage)
-                return time() - start
+                end = time()
+                return dfs_storage.getSize() / ((end - start) * 1e3)
             except:
                 traceback.print_exc()
                 self.error_log += "Import with m=%s, s=%s, r=%s, p=%s:" % (

Modified: trunk/tools/perfs
==============================================================================
--- trunk/tools/perfs [iso-8859-1] (original)
+++ trunk/tools/perfs [iso-8859-1] Tue Jun 14 12:04:32 2011
@@ -24,7 +24,7 @@ class ImportBenchmark(BenchmarkRunner):
         parser.add_option('-r', '--replicas')
 
     def load_options(self, options, args):
-        if not options.datafs or not os.path.exists(options.datafs):
+        if options.datafs and not os.path.exists(options.datafs):
             sys.exit('Missing or wrong data.fs argument')
         return dict(
             datafs = options.datafs,
@@ -74,8 +74,12 @@ class ImportBenchmark(BenchmarkRunner):
         # open storages clients
         datafs = self._config.datafs
         neo_storage = neo.getZODBStorage()
-        dfs_storage = FileStorage(file_name=datafs)
-        dfs_size = os.path.getsize(datafs)
+        if datafs:
+            dfs_storage = FileStorage(file_name=datafs)
+        else:
+            from neo.tests.stat_zodb import PROD1
+            from random import Random
+            dfs_storage = PROD1(Random(0)).as_storage(10000)
 
         # monkey patch storage
         txn_dict, obj_dict = {}, {}
@@ -92,13 +96,13 @@ class ImportBenchmark(BenchmarkRunner):
             'Transactions': txn_dict.values(),
             'Objects': obj_dict.values(),
         }
-        return (dfs_size, elapsed, stats)
+        return (dfs_storage.getSize(), elapsed, stats)
 
     def buildReport(self, dfs_size, elapsed, stats):
         """ build a report for the given import data """
         config = self._config
-        dfs_size /= 1024
-        size = dfs_size / 1024
+        dfs_size /= 1e3
+        size = dfs_size / 1e3
         speed = dfs_size / elapsed
 
         # configuration




More information about the Neo-report mailing list