[Erp5-report] r11491 - /erp5/trunk/products/ERP5Type/tests/testCache.py

nobody at svn.erp5.org nobody at svn.erp5.org
Sat Nov 25 18:32:38 CET 2006


Author: jerome
Date: Sat Nov 25 18:32:36 2006
New Revision: 11491

URL: http://svn.erp5.org?rev=11491&view=rev
Log:
testCache was not started due to import errors


Modified:
    erp5/trunk/products/ERP5Type/tests/testCache.py

Modified: erp5/trunk/products/ERP5Type/tests/testCache.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5Type/tests/testCache.py?rev=11491&r1=11490&r2=11491&view=diff
==============================================================================
--- erp5/trunk/products/ERP5Type/tests/testCache.py (original)
+++ erp5/trunk/products/ERP5Type/tests/testCache.py Sat Nov 25 18:32:36 2006
@@ -29,28 +29,34 @@
 import random
 import unittest
 import time
-import base64, md5
-from ERP5Type.CachePlugins.RamCache import RamCache
-from ERP5Type.CachePlugins.DistributedRamCache import DistributedRamCache
-from ERP5Type.CachePlugins.SQLCache import SQLCache
-from ERP5Type.CachePlugins.BaseCache import CacheEntry
+import os
+
+from Products.ERP5Type.CachePlugins.RamCache import RamCache
+from Products.ERP5Type.CachePlugins.DistributedRamCache import\
+                                              DistributedRamCache
+from Products.ERP5Type.CachePlugins.SQLCache import SQLCache
+from Products.ERP5Type.CachePlugins.BaseCache import CacheEntry
+from Products.ERP5Type.Tool.CacheTool import CacheTool
 
 
 class Foo:
   my_field = (1,2,3,4,5)
 
 class TestRamCache(unittest.TestCase):
-    
+  quiet = 1
+
   def setUp(self):
-    self.cache_plugins = (RamCache(), 
+    # for SQLCache, get the connection string from runUnitTest.py parameters,
+    # and use parseDBConnectionString to make it usable by SQLCache
+    mysql_connection_string = os.environ.get(
+          'erp5_sql_connection_string', 'test test')
+    sql_cache_kw = CacheTool().parseDBConnectionString(mysql_connection_string)
+    sql_cache_kw['cache_table_name'] = 'cache'
+
+    self.cache_plugins = (RamCache(),
                           DistributedRamCache({'servers': '127.0.0.1:11211',
                                                  'debugLevel': 7,}),
-                          SQLCache( {'server': '',
-                                     'user': '',
-                                     'passwd': '',
-                                     'db': 'test',
-                                     'cache_table_name': 'cache',
-                                      }),
+                          SQLCache( sql_cache_kw ),
                         )
 
   def testScope(self):
@@ -63,10 +69,12 @@
     test_scopes.sort()
     
     ## remove DistributedRamCache since it's a flat storage
-    filtered_cache_plugins = filter(lambda x: not isinstance(x, DistributedRamCache), self.cache_plugins)
+    filtered_cache_plugins = filter(
+        lambda x: not isinstance(x, DistributedRamCache), self.cache_plugins)
     
     for cache_plugin in filtered_cache_plugins:
-      print "TESTING (scope): ", cache_plugin
+      if not self.quiet:
+        print "TESTING (scope): ", cache_plugin
 
       ## clear cache for this plugin
       cache_plugin.clearCache()
@@ -81,11 +89,12 @@
         
         ## we set ONLY one value per scope -> check if we get the same cache_id
         self.assertEqual([cache_id], cache_plugin.getScopeKeyList(scope))
-        print "\t", cache_id, scope, "\t\tOK"
+        if not self.quiet:
+          print "\t", cache_id, scope, "\t\tOK"
       
       ## get list of scopes which must be the same as test_scopes since we clear cache initially
       scopes_from_cache = cache_plugin.getScopeList()
-      scopes_from_cache.sort()  
+      scopes_from_cache.sort()
       self.assertEqual(test_scopes, scopes_from_cache)
       
       ## remove scope one by one
@@ -108,13 +117,15 @@
       self.generaltestSetGet(cache_plugin, 100)
     
   def testExpire(self):
-    """ Check expired by setting a key, wit for its timeout and check if in cache"""
+    """ Check expired by setting a key, wit for its timeout and check if in
+    cache"""
     for cache_plugin in self.cache_plugins:
       self.generalExpire(cache_plugin, 2)
 
             
   def generalExpire(self, cache_plugin, iterations):
-    print "TESTING (expire): ", cache_plugin
+    if not self.quiet:
+      print "TESTING (expire): ", cache_plugin
     base_timeout = 1
     values = self.prepareValues(iterations)
     scope = "peter"
@@ -123,7 +134,8 @@
       count = count +1
       cache_timeout = base_timeout + random.random()*2
       cache_id = "mycache_id_to_expire_%s" %(count)
-      print "\t", cache_id, " ==> timeout (s) = ", cache_timeout, 
+      if not self.quiet:
+        print "\t", cache_id, " ==> timeout (s) = ", cache_timeout,
 
       ## set to cache
       cache_plugin.set(cache_id, scope, value, cache_timeout)
@@ -136,10 +148,12 @@
         
       ##  check it, we MUST NOT have this key any more in cache
       self.assertEqual(False, cache_plugin.has_key(cache_id, scope))
-      print "\t\tOK"
+      if not self.quiet:
+        print "\t\tOK"
      
   def generaltestSetGet(self, cache_plugin, iterations):
-    print "TESTING (set/get/has/del): ", cache_plugin
+    if not self.quiet:
+      print "TESTING (set/get/has/del): ", cache_plugin
     values = self.prepareValues(iterations)
     cache_duration = 30
     scope = "peter"
@@ -150,7 +164,8 @@
         
       ## set to cache
       cache_plugin.set(cache_id, scope, value, cache_duration)
-      print "\t", cache_id, 
+      if not self.quiet:
+        print "\t", cache_id,
         
       ## check has_key()
       self.assertEqual(True, cache_plugin.has_key(cache_id, scope))
@@ -175,7 +190,8 @@
       cache_plugin.delete(cache_id, scope)
       self.assertEqual(False, cache_plugin.has_key(cache_id, scope))
         
-      print "\t\tOK"
+      if not self.quiet:
+        print "\t\tOK"
     
   def prepareValues(self, iterations):
     """ generate a big list of values """




More information about the Erp5-report mailing list