[Erp5-report] r11491 - /erp5/trunk/products/ERP5Type/tests/testCache.py
nobody at svn.erp5.org
nobody at svn.erp5.org
Sat Nov 25 18:32:38 CET 2006
Author: jerome
Date: Sat Nov 25 18:32:36 2006
New Revision: 11491
URL: http://svn.erp5.org?rev=11491&view=rev
Log:
testCache was not started due to import errors
Modified:
erp5/trunk/products/ERP5Type/tests/testCache.py
Modified: erp5/trunk/products/ERP5Type/tests/testCache.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5Type/tests/testCache.py?rev=11491&r1=11490&r2=11491&view=diff
==============================================================================
--- erp5/trunk/products/ERP5Type/tests/testCache.py (original)
+++ erp5/trunk/products/ERP5Type/tests/testCache.py Sat Nov 25 18:32:36 2006
@@ -29,28 +29,34 @@
import random
import unittest
import time
-import base64, md5
-from ERP5Type.CachePlugins.RamCache import RamCache
-from ERP5Type.CachePlugins.DistributedRamCache import DistributedRamCache
-from ERP5Type.CachePlugins.SQLCache import SQLCache
-from ERP5Type.CachePlugins.BaseCache import CacheEntry
+import os
+
+from Products.ERP5Type.CachePlugins.RamCache import RamCache
+from Products.ERP5Type.CachePlugins.DistributedRamCache import\
+ DistributedRamCache
+from Products.ERP5Type.CachePlugins.SQLCache import SQLCache
+from Products.ERP5Type.CachePlugins.BaseCache import CacheEntry
+from Products.ERP5Type.Tool.CacheTool import CacheTool
class Foo:
my_field = (1,2,3,4,5)
class TestRamCache(unittest.TestCase):
-
+ quiet = 1
+
def setUp(self):
- self.cache_plugins = (RamCache(),
+ # for SQLCache, get the connection string from runUnitTest.py parameters,
+ # and use parseDBConnectionString to make it usable by SQLCache
+ mysql_connection_string = os.environ.get(
+ 'erp5_sql_connection_string', 'test test')
+ sql_cache_kw = CacheTool().parseDBConnectionString(mysql_connection_string)
+ sql_cache_kw['cache_table_name'] = 'cache'
+
+ self.cache_plugins = (RamCache(),
DistributedRamCache({'servers': '127.0.0.1:11211',
'debugLevel': 7,}),
- SQLCache( {'server': '',
- 'user': '',
- 'passwd': '',
- 'db': 'test',
- 'cache_table_name': 'cache',
- }),
+ SQLCache( sql_cache_kw ),
)
def testScope(self):
@@ -63,10 +69,12 @@
test_scopes.sort()
## remove DistributedRamCache since it's a flat storage
- filtered_cache_plugins = filter(lambda x: not isinstance(x, DistributedRamCache), self.cache_plugins)
+ filtered_cache_plugins = filter(
+ lambda x: not isinstance(x, DistributedRamCache), self.cache_plugins)
for cache_plugin in filtered_cache_plugins:
- print "TESTING (scope): ", cache_plugin
+ if not self.quiet:
+ print "TESTING (scope): ", cache_plugin
## clear cache for this plugin
cache_plugin.clearCache()
@@ -81,11 +89,12 @@
## we set ONLY one value per scope -> check if we get the same cache_id
self.assertEqual([cache_id], cache_plugin.getScopeKeyList(scope))
- print "\t", cache_id, scope, "\t\tOK"
+ if not self.quiet:
+ print "\t", cache_id, scope, "\t\tOK"
## get list of scopes which must be the same as test_scopes since we clear cache initially
scopes_from_cache = cache_plugin.getScopeList()
- scopes_from_cache.sort()
+ scopes_from_cache.sort()
self.assertEqual(test_scopes, scopes_from_cache)
## remove scope one by one
@@ -108,13 +117,15 @@
self.generaltestSetGet(cache_plugin, 100)
def testExpire(self):
- """ Check expired by setting a key, wit for its timeout and check if in cache"""
+ """ Check expired by setting a key, wit for its timeout and check if in
+ cache"""
for cache_plugin in self.cache_plugins:
self.generalExpire(cache_plugin, 2)
def generalExpire(self, cache_plugin, iterations):
- print "TESTING (expire): ", cache_plugin
+ if not self.quiet:
+ print "TESTING (expire): ", cache_plugin
base_timeout = 1
values = self.prepareValues(iterations)
scope = "peter"
@@ -123,7 +134,8 @@
count = count +1
cache_timeout = base_timeout + random.random()*2
cache_id = "mycache_id_to_expire_%s" %(count)
- print "\t", cache_id, " ==> timeout (s) = ", cache_timeout,
+ if not self.quiet:
+ print "\t", cache_id, " ==> timeout (s) = ", cache_timeout,
## set to cache
cache_plugin.set(cache_id, scope, value, cache_timeout)
@@ -136,10 +148,12 @@
## check it, we MUST NOT have this key any more in cache
self.assertEqual(False, cache_plugin.has_key(cache_id, scope))
- print "\t\tOK"
+ if not self.quiet:
+ print "\t\tOK"
def generaltestSetGet(self, cache_plugin, iterations):
- print "TESTING (set/get/has/del): ", cache_plugin
+ if not self.quiet:
+ print "TESTING (set/get/has/del): ", cache_plugin
values = self.prepareValues(iterations)
cache_duration = 30
scope = "peter"
@@ -150,7 +164,8 @@
## set to cache
cache_plugin.set(cache_id, scope, value, cache_duration)
- print "\t", cache_id,
+ if not self.quiet:
+ print "\t", cache_id,
## check has_key()
self.assertEqual(True, cache_plugin.has_key(cache_id, scope))
@@ -175,7 +190,8 @@
cache_plugin.delete(cache_id, scope)
self.assertEqual(False, cache_plugin.has_key(cache_id, scope))
- print "\t\tOK"
+ if not self.quiet:
+ print "\t\tOK"
def prepareValues(self, iterations):
""" generate a big list of values """
More information about the Erp5-report
mailing list