[Erp5-report] r24770 - in /erp5/trunk/products: ERP5/tests/ ERP5Type/patches/

nobody at svn.erp5.org nobody at svn.erp5.org
Tue Dec 2 20:03:55 CET 2008


Author: jm
Date: Tue Dec  2 20:03:55 2008
New Revision: 24770

URL: http://svn.erp5.org?rev=24770&view=rev
Log:
* Fix regression causing worklist cache to not work at all.
* Clean testWorkflow and add unit test for the use of related keys in worklist.

Because related keys are not supported when using erp5_worklist_sql, testSQLCachedWorklist will fail.

Modified:
    erp5/trunk/products/ERP5/tests/testWorklist.py
    erp5/trunk/products/ERP5Type/patches/WorkflowTool.py

Modified: erp5/trunk/products/ERP5/tests/testWorklist.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5/tests/testWorklist.py?rev=24770&r1=24769&r2=24770&view=diff
==============================================================================
--- erp5/trunk/products/ERP5/tests/testWorklist.py [utf8] (original)
+++ erp5/trunk/products/ERP5/tests/testWorklist.py [utf8] Tue Dec  2 20:03:55 2008
@@ -57,7 +57,7 @@
   worklist_wrong_state_id = '%s_wrong_state' % worklist_owner_id
   actbox_wrong_state = '%s_wrong_state' % actbox_owner_name
 
-  worklist_int_variable_id = 'int_value_workflist'
+  worklist_int_variable_id = 'int_value_worklist'
   actbox_int_variable_name = 'int_value_todo'
   int_catalogued_variable_id = 'int_index'
   int_value = 1
@@ -130,16 +130,9 @@
     }
     return user_dict
 
-  def stepLoginAsFoo(self, sequence=None, sequence_list=None, **kw):
-    self.login("foo")
-
-  def stepLoginAsBar(self, sequence=None,
-                       sequence_list=None, **kw):
-    self.login("bar")
-
-  def createDocument(self):
+  def createDocument(self, **kw):
     module = self.getPortal().getDefaultModule(self.checked_portal_type)
-    result = module.newContent(portal_type=self.checked_portal_type)
+    result = module.newContent(portal_type=self.checked_portal_type, **kw)
     result.setProperty(self.int_catalogued_variable_id, self.int_value)
     assert result.getValidationState() == self.checked_validation_state
     return result
@@ -158,15 +151,20 @@
     # reset aq_dynamic cache
     _aq_reset()
 
-  def addWorkflowCataloguedVariable(self):
-    workflow = self.getWorkflowTool()[self.checked_workflow]
-    workflow.variables.addVariable(self.int_catalogued_variable_id)
-    variable = getattr(workflow.variables, self.int_catalogued_variable_id)
-
-  def createWorklist(self):
-    workflow = self.getWorkflowTool()[self.checked_workflow]
-    worklists = workflow.worklists
-
+  def addWorkflowCataloguedVariable(self, workflow_id, variable_id):
+    variables = self.getWorkflowTool()[workflow_id].variables
+    variables.addVariable(variable_id)
+    assert variables[variable_id].for_catalog == 1
+
+  def createWorklist(self, workflow_id, worklist_id, actbox_name, **kw):
+    worklists = self.getWorkflowTool()[workflow_id].worklists
+    worklists.addWorklist(worklist_id)
+    worklists._getOb(worklist_id).setProperties('',
+        actbox_name='%s (%%(count)s)' % actbox_name, props=dict(
+          (k.startswith('guard_') and k or 'var_match_'+k, v)
+          for k, v in kw.iteritems()))
+
+  def createWorklists(self):
     for worklist_id, actbox_name, role, expr, state, int_variable in [
           (self.worklist_assignor_id, self.actbox_assignor_name, 
            'Assignor', None, self.checked_validation_state, None),
@@ -181,21 +179,22 @@
           (self.worklist_int_variable_id, self.actbox_int_variable_name, 
            None, None, None, str(self.int_value)),
     ]:
-      worklists.addWorklist(worklist_id)
-      worklist_definition = worklists._getOb(worklist_id)
-      worklist_definition.setProperties('',
-          actbox_name='%s (%%(count)s)' % (actbox_name, ),
-          props={'guard_roles': role,
-                 'var_match_portal_type': self.checked_portal_type,
-                 'var_match_validation_state': state,
-                 # Variable value is saved as string in worklist
-                 'var_match_%s' % self.int_catalogued_variable_id: \
-                                                            int_variable,
-                  'guard_expr': expr})
+      self.createWorklist(self.checked_workflow, worklist_id, actbox_name,
+                          guard_roles=role, guard_expr=expr,
+                          portal_type=self.checked_portal_type,
+                          validation_state=state,
+                          **{self.int_catalogued_variable_id: int_variable})
 
   def clearCache(self):
     self.portal.portal_caches.clearAllCache()
 
+  def checkWorklist(self, result, name, count):
+    entry_list = [x for x in result if x['name'].startswith(name)]
+    self.assertEquals(len(entry_list), count and 1)
+    if count:
+      self.assertEquals(count,
+        self.getWorklistDocumentCountFromActionName(entry_list[0]['name']))
+
   def test_01_worklist(self, quiet=0, run=run_all_test):
     """
     Test the permission of the building module.
@@ -205,13 +204,14 @@
 
     workflow_tool = self.portal.portal_workflow
 
-    self.logMessage("Create Manager")
+    self.logMessage("Create users")
     self.createManagerAndLogin()
     self.createUsers()
-    self.logMessage("Create worklist")
+    self.logMessage("Create worklists")
     self.associatePropertySheet()
-    self.addWorkflowCataloguedVariable()
-    self.createWorklist()
+    self.addWorkflowCataloguedVariable(self.checked_workflow,
+                                       self.int_catalogued_variable_id)
+    self.createWorklists()
     self.logMessage("Create document as Manager")
     document = self.createDocument()
 
@@ -220,156 +220,43 @@
     self.clearCache()
 
     result = workflow_tool.listActions(object=document)
-    self.logout()
 
     # Users can not see worklist as they are not Assignor
     for user_id in ('manager', ):
       self.login(user_id)
       result = workflow_tool.listActions(object=document)
       self.logMessage("Check %s worklist as Assignor" % user_id)
-      entry_list = [x for x in result \
-                    if x['name'].startswith(self.actbox_assignor_name)]
-      self.assertEquals(len(entry_list), 0)
+      self.checkWorklist(result, self.actbox_assignor_name, 0)
       self.logMessage("Check %s worklist as Owner" % user_id)
-      entry_list = [x for x in result \
-                    if x['name'].startswith(self.actbox_owner_name)]
-      self.assertEquals(len(entry_list), 1)
-      self.assertEquals(
-        self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
-      self.logout()
+      self.checkWorklist(result, self.actbox_owner_name, 1)
     for user_id in ('foo', 'bar'):
       self.logMessage("Check %s worklist" % user_id)
       self.login(user_id)
       result = workflow_tool.listActions(object=document)
       self.assertEquals(result, [])
-      self.logout()
-
-    # Define foo as Assignor
-    self.login('manager')
-    self.logMessage("Give foo Assignor role")
-    document.manage_addLocalRoles('foo', ['Assignor'])
-    self.logMessage("Give manager Assignor role")
-    document.manage_addLocalRoles('manager', ['Assignor'])
-    document.reindexObject()
-    get_transaction().commit()
-    self.tic()
-    self.clearCache()
-    self.logout()
-
-    for user_id in ('manager', ):
-      self.login(user_id)
-      result = workflow_tool.listActions(object=document)
-      self.logMessage("Check %s worklist as Assignor" % user_id)
-      entry_list = [x for x in result \
-                    if x['name'].startswith(self.actbox_assignor_name)]
-      self.assertEquals(len(entry_list), 1)
-      self.assertEquals(
-        self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
-      self.logMessage("Check %s worklist as Owner" % user_id)
-      entry_list = [x for x in result \
-                    if x['name'].startswith(self.actbox_owner_name)]
-      self.assertEquals(len(entry_list), 1)
-      self.assertEquals(
-        self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
-      self.logMessage("Check %s worklist as Owner and Assignor" % user_id)
-      entry_list = [x for x in result \
-                    if x['name'].startswith(self.actbox_assignor_owner_name)]
-      self.assertEquals(len(entry_list), 1)
-      self.assertEquals(
-        self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
-      self.logout()
-    for user_id in ('bar', ):
-      self.login(user_id)
-      result = workflow_tool.listActions(object=document)
-      self.logMessage("Check %s worklist as Assignor" % user_id)
-      entry_list = [x for x in result \
-                    if x['name'].startswith(self.actbox_assignor_name)]
-      self.assertEquals(len(entry_list), 0)
-      self.logMessage("Check %s worklist as Owner" % user_id)
-      entry_list = [x for x in result \
-                    if x['name'].startswith(self.actbox_owner_name)]
-      self.assertEquals(len(entry_list), 0)
-      self.logMessage("Check %s worklist as Owner and Assignor" % user_id)
-      entry_list = [x for x in result \
-                    if x['name'].startswith(self.actbox_assignor_owner_name)]
-      self.assertEquals(len(entry_list), 0)
-      self.logout()
-    for user_id in ('foo', ):
-      self.login(user_id)
-      result = workflow_tool.listActions(object=document)
-      self.logMessage("Check %s worklist as Assignor" % user_id)
-      entry_list = [x for x in result \
-                    if x['name'].startswith(self.actbox_assignor_name)]
-      self.assertEquals(len(entry_list), 1)
-      self.assertTrue(
-        self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
-      self.logMessage("Check %s worklist as Owner" % user_id)
-      entry_list = [x for x in result \
-                    if x['name'].startswith(self.actbox_owner_name)]
-      self.assertEquals(len(entry_list), 0)
-      self.logMessage("Check %s worklist as Owner and Assignor" % user_id)
-      entry_list = [x for x in result \
-                    if x['name'].startswith(self.actbox_assignor_owner_name)]
-      self.assertEquals(len(entry_list), 1)
-      self.assertEquals(
-        self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
-      self.logout()
-
-    # Define foo and bar as Assignee
-    self.login('manager')
-    self.logMessage("Give foo Assignee role")
-    document.manage_addLocalRoles('foo', ['Assignee'])
-    self.logMessage("Give bar Assignee role")
-    document.manage_addLocalRoles('bar', ['Assignee'])
-    document.reindexObject()
-    get_transaction().commit()
-    self.tic()
-    self.clearCache()
-    self.logout()
-
-    # Users can not see worklist as they are not Assignor
-    for user_id in ('manager', ):
-      self.login(user_id)
-      result = workflow_tool.listActions(object=document)
-      self.logMessage("Check %s worklist as Assignor" % user_id)
-      entry_list = [x for x in result \
-                    if x['name'].startswith(self.actbox_assignor_name)]
-      self.assertEquals(len(entry_list), 1)
-      self.assertTrue(
-        self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
-      self.logMessage("Check %s worklist as Owner" % user_id)
-      entry_list = [x for x in result \
-                    if x['name'].startswith(self.actbox_owner_name)]
-      self.assertEquals(len(entry_list), 1)
-      self.assertTrue(
-        self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
-      self.logout()
-    for user_id in ('bar', ):
-      self.login(user_id)
-      result = workflow_tool.listActions(object=document)
-      self.logMessage("Check %s worklist as Assignor" % user_id)
-      entry_list = [x for x in result \
-                    if x['name'].startswith(self.actbox_assignor_name)]
-      self.assertEquals(len(entry_list), 0)
-      self.logMessage("Check %s worklist as Owner" % user_id)
-      entry_list = [x for x in result \
-                    if x['name'].startswith(self.actbox_owner_name)]
-      self.assertEquals(len(entry_list), 0)
-      self.logout()
-    for user_id in ('foo', ):
-      self.login(user_id)
-      result = workflow_tool.listActions(object=document)
-      self.logMessage("Check %s worklist as Assignor" % user_id)
-      entry_list = [x for x in result \
-                    if x['name'].startswith(self.actbox_assignor_name)]
-      self.assertEquals(len(entry_list), 1)
-      self.assertTrue(
-        self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
-      self.logMessage("Check %s worklist as Owner" % user_id)
-      entry_list = [x for x in result \
-                    if x['name'].startswith(self.actbox_owner_name)]
-      self.assertEquals(len(entry_list), 0)
-      self.logout()
+
+    for role, user_id_list in (('Assignor', ('foo', 'manager')),
+                               ('Assignee', ('foo', 'bar'))):
+      self.login('manager')
+      for user_id in user_id_list:
+        self.logMessage("Give %s %s role" % (user_id, role))
+        document.manage_addLocalRoles(user_id, [role])
+      document.reindexObject()
+      get_transaction().commit()
+      self.tic()
+      self.clearCache()
+
+      for user_id, assignor, owner, both in (('manager', 1, 1, 1),
+                                             ('bar'    , 0, 0, 0),
+                                             ('foo'    , 1, 0, 1)):
+        self.login(user_id)
+        result = workflow_tool.listActions(object=document)
+        self.logMessage("  Check %s worklist as Assignor" % user_id)
+        self.checkWorklist(result, self.actbox_assignor_name, assignor)
+        self.logMessage("  Check %s worklist as Owner" % user_id)
+        self.checkWorklist(result, self.actbox_owner_name, owner)
+        self.logMessage("  Check %s worklist as Owner and Assignor" % user_id)
+        self.checkWorklist(result, self.actbox_assignor_owner_name, both)
 
     # Check if int variable are managed by the worklist
     user_id = 'manager'
@@ -377,9 +264,7 @@
     result = workflow_tool.listActions(object=document)
     self.logMessage("Check %s worklist with int value as %s" % \
                                    (user_id, self.int_value))
-    entry_list = [x for x in result \
-                  if x['name'].startswith(self.actbox_int_variable_name)]
-    self.assertEquals(len(entry_list), 1)
+    self.checkWorklist(result, self.actbox_int_variable_name, 1)
 
     # Change int value on document
     new_value = self.int_value + 1
@@ -391,11 +276,7 @@
     result = workflow_tool.listActions(object=document)
     self.logMessage("Check %s worklist with int value as %s" % \
                                    (user_id, new_value))
-    entry_list = [x for x in result \
-                  if x['name'].startswith(self.actbox_int_variable_name)]
-    self.assertEquals(len(entry_list), 0)
-
-    self.logout()
+    self.checkWorklist(result, self.actbox_int_variable_name, 0)
 
     #
     # Check monovalued security role
@@ -413,65 +294,27 @@
     bar_assignee_document.manage_setLocalRoles('manager', ['Assignee'])
     bar_assignee_document.manage_permission('View', ['Owner', 'Assignee'], 0)
 
-    self.logout()
-    self.login('manager')
+    user_id = 'manager'
+    self.login(user_id)
 
     module.manage_delLocalRoles('bar')
 
-    # User can not see worklist as user can not view the document
-    document.manage_setLocalRoles('manager', ['Owner', 'Assignee'])
-    document.manage_permission('View', [], 0)
-    document.reindexObject()
-    get_transaction().commit()
-    self.tic()
-    self.clearCache()
-    self.logout()
-
-    for user_id in ('manager', ):
-      self.login(user_id)
-      result = workflow_tool.listActions(object=document)
-      self.logMessage("Check %s worklist as Owner" % user_id)
-      entry_list = [x for x in result \
-                    if x['name'].startswith(self.actbox_owner_name)]
-      self.assertEquals(len(entry_list), 0)
-      self.logout()
-
-    # User can not see worklist as Owner can not view the document
-    document.manage_setLocalRoles('manager', ['Owner', 'Assignee'])
-    document.manage_permission('View', ['Assignee'], 0)
-    document.reindexObject()
-    get_transaction().commit()
-    self.tic()
-    self.clearCache()
-    self.logout()
-
-    for user_id in ('manager', ):
-      self.login(user_id)
-      result = workflow_tool.listActions(object=document)
-      self.logMessage("Check %s worklist as Owner" % user_id)
-      entry_list = [x for x in result \
-                    if x['name'].startswith(self.actbox_owner_name)]
-      self.assertEquals(len(entry_list), 0)
-      self.logout()
-
-    # User can see worklist as Owner can view the document
-    document.manage_permission('View', ['Owner', 'Assignee'], 0)
-    document.reindexObject()
-    get_transaction().commit()
-    self.tic()
-    self.clearCache()
-    self.logout()
-
-    for user_id in ('manager', ):
-      self.login(user_id)
-      result = workflow_tool.listActions(object=document)
-      self.logMessage("Check %s worklist as Owner" % user_id)
-      entry_list = [x for x in result \
-                    if x['name'].startswith(self.actbox_owner_name)]
-      self.assertEquals(len(entry_list), 1)
-      self.assertTrue(
-        self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
-      self.logout()
+    def test(*count_list):
+      local_role_list = 'Assignee', 'Owner'
+      document.manage_setLocalRoles('manager', local_role_list)
+
+      for i, count in enumerate(count_list):
+        document.manage_permission('View', local_role_list[:i], 0)
+        document.reindexObject()
+        get_transaction().commit()
+        self.tic()
+        self.clearCache()
+
+        result = workflow_tool.listActions(object=document)
+        self.logMessage("Check %s worklist as Owner (%s)" % (user_id, count))
+        self.checkWorklist(result, self.actbox_owner_name, count)
+
+    test(0, 0, 1)
 
     # Define a local role key
     sql_catalog = self.portal.portal_catalog.getSQLCatalog()
@@ -482,67 +325,56 @@
     self.portal.portal_caches.clearAllCache()
 
     try:
-
-      # User can not see worklist as user can not view the document
-      document.manage_setLocalRoles('manager', ['Owner', 'Assignee'])
-      document.manage_permission('View', [], 0)
-      document.reindexObject()
-      get_transaction().commit()
-      self.tic()
-      self.clearCache()
-      self.logout()
-
-      for user_id in ('manager', ):
-        self.login(user_id)
-        result = workflow_tool.listActions(object=document)
-        self.logMessage("Check %s worklist as Owner" % user_id)
-        entry_list = [x for x in result \
-                      if x['name'].startswith(self.actbox_owner_name)]
-        self.assertEquals(len(entry_list), 0)
-        self.logout()
-
-      # User can see worklist as Assignee can view the document
-      document.manage_permission('View', ['Assignee'], 0)
-      document.reindexObject()
-      get_transaction().commit()
-      self.tic()
-      self.clearCache()
-      self.logout()
-
-      for user_id in ('manager', ):
-        self.login(user_id)
-        result = workflow_tool.listActions(object=document)
-        self.logMessage("Check %s worklist as Owner" % user_id)
-        entry_list = [x for x in result \
-                      if x['name'].startswith(self.actbox_owner_name)]
-        self.assertEquals(len(entry_list), 1)
-        self.assertTrue(
-          self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
-        self.logout()
-
-      # User can see worklist as Owner can view the document
-      document.manage_permission('View', ['Owner', 'Assignee'], 0)
-      document.reindexObject()
-      get_transaction().commit()
-      self.tic()
-      self.clearCache()
-      self.logout()
-
-      for user_id in ('manager', ):
-        self.login(user_id)
-        result = workflow_tool.listActions(object=document)
-        self.logMessage("Check %s worklist as Owner" % user_id)
-        entry_list = [x for x in result \
-                      if x['name'].startswith(self.actbox_owner_name)]
-        self.assertEquals(len(entry_list), 1)
-        self.assertTrue(
-          self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
-        self.logout()
+      test(0, 1, 1)
     finally:
       sql_catalog.sql_catalog_local_role_keys = \
           current_sql_catalog_local_role_keys
       get_transaction().commit()
 
+    #
+    # Test related keys
+    #
+    self.logMessage("Test related keys")
+    self.addWorkflowCataloguedVariable(self.checked_workflow,
+                                       'base_category_id')
+
+    for base_category, category_list in (
+        ('region', ('somewhere', 'elsewhere')),
+        ('role',   ('client',    'supplier'))):
+      newContent = self.getCategoryTool()[base_category].newContent
+      for category in category_list:
+        newContent(portal_type='Category', id=category)
+
+    self.createWorklist(self.checked_workflow, 'region_worklist', 'has_region',
+                        portal_type=self.checked_portal_type,
+                        base_category_id='region')
+    self.createWorklist(self.checked_workflow, 'role_worklist', 'has_role',
+                        portal_type=self.checked_portal_type,
+                        base_category_id='role')
+
+    get_transaction().commit()
+    self.tic()
+    self.clearCache()
+    self.logMessage("  Check no document has region/role categories defined")
+    result = workflow_tool.listActions(object=document)
+    self.checkWorklist(result, 'has_region', 0)
+    self.checkWorklist(result, 'has_role', 0)
+
+    self.logMessage("  Creates documents with region/role categories defined")
+    self.createDocument(role='client')
+    self.createDocument(region='somewhere')
+    self.createDocument(region='elsewhere')
+
+    get_transaction().commit()
+    self.tic()
+    self.clearCache()
+    self.logMessage(
+             "  Check there are documents with region/role categories defined")
+    result = workflow_tool.listActions(object=document)
+    self.checkWorklist(result, 'has_region', 2)
+    self.checkWorklist(result, 'has_role', 1)
+
+
 def test_suite():
   suite = unittest.TestSuite()
   suite.addTest(unittest.makeSuite(TestWorklist))

Modified: erp5/trunk/products/ERP5Type/patches/WorkflowTool.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5Type/patches/WorkflowTool.py?rev=24770&r1=24769&r2=24770&view=diff
==============================================================================
--- erp5/trunk/products/ERP5Type/patches/WorkflowTool.py [utf8] (original)
+++ erp5/trunk/products/ERP5Type/patches/WorkflowTool.py [utf8] Tue Dec  2 20:03:55 2008
@@ -449,11 +449,12 @@
     portal_url = getToolByName(self, 'portal_url')()
     portal_catalog = getToolByName(self, 'portal_catalog')
     search_result = getattr(self, "Base_getCountFromWorklistTable", None)
-    if search_result is None:
+    use_cache = search_result is not None
+    if use_cache:
+      select_expression_prefix = 'sum(`%s`) as %s' % (COUNT_COLUMN_TITLE, COUNT_COLUMN_TITLE)
+    else:
       search_result = portal_catalog.unrestrictedSearchResults
       select_expression_prefix = 'count(*) as %s' % (COUNT_COLUMN_TITLE, )
-    else:
-      select_expression_prefix = 'sum(`%s`) as %s' % (COUNT_COLUMN_TITLE, COUNT_COLUMN_TITLE)
     getSecurityUidListAndRoleColumnDict = \
       portal_catalog.getSecurityUidListAndRoleColumnDict
     security_query_cache_dict = {}
@@ -486,7 +487,10 @@
         select_expression = [select_expression_prefix]
         for criterion_id in total_criterion_id_list:
           mapped_key = acceptable_key_dict[criterion_id]
-          if isinstance(mapped_key, str): # related key
+          if use_cache: # no support for related keys
+            select_expression.append(criterion_id)
+            continue
+          elif isinstance(mapped_key, str): # related key
             mapped_key = mapped_key.split('/')
             related_table_map_dict[criterion_id] = table_alias_list = tuple(
               (table_id, '%s_%s' % (criterion_id, i))




More information about the Erp5-report mailing list