[Erp5-report] r24770 - in /erp5/trunk/products: ERP5/tests/ ERP5Type/patches/
nobody at svn.erp5.org
nobody at svn.erp5.org
Tue Dec 2 20:03:55 CET 2008
Author: jm
Date: Tue Dec 2 20:03:55 2008
New Revision: 24770
URL: http://svn.erp5.org?rev=24770&view=rev
Log:
* Fix regression causing worklist cache to not work at all.
* Clean testWorkflow and add unit test for the use of related keys in worklist.
Because related keys are not supported when using erp5_worklist_sql, testSQLCachedWorklist will fail.
Modified:
erp5/trunk/products/ERP5/tests/testWorklist.py
erp5/trunk/products/ERP5Type/patches/WorkflowTool.py
Modified: erp5/trunk/products/ERP5/tests/testWorklist.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5/tests/testWorklist.py?rev=24770&r1=24769&r2=24770&view=diff
==============================================================================
--- erp5/trunk/products/ERP5/tests/testWorklist.py [utf8] (original)
+++ erp5/trunk/products/ERP5/tests/testWorklist.py [utf8] Tue Dec 2 20:03:55 2008
@@ -57,7 +57,7 @@
worklist_wrong_state_id = '%s_wrong_state' % worklist_owner_id
actbox_wrong_state = '%s_wrong_state' % actbox_owner_name
- worklist_int_variable_id = 'int_value_workflist'
+ worklist_int_variable_id = 'int_value_worklist'
actbox_int_variable_name = 'int_value_todo'
int_catalogued_variable_id = 'int_index'
int_value = 1
@@ -130,16 +130,9 @@
}
return user_dict
- def stepLoginAsFoo(self, sequence=None, sequence_list=None, **kw):
- self.login("foo")
-
- def stepLoginAsBar(self, sequence=None,
- sequence_list=None, **kw):
- self.login("bar")
-
- def createDocument(self):
+ def createDocument(self, **kw):
module = self.getPortal().getDefaultModule(self.checked_portal_type)
- result = module.newContent(portal_type=self.checked_portal_type)
+ result = module.newContent(portal_type=self.checked_portal_type, **kw)
result.setProperty(self.int_catalogued_variable_id, self.int_value)
assert result.getValidationState() == self.checked_validation_state
return result
@@ -158,15 +151,20 @@
# reset aq_dynamic cache
_aq_reset()
- def addWorkflowCataloguedVariable(self):
- workflow = self.getWorkflowTool()[self.checked_workflow]
- workflow.variables.addVariable(self.int_catalogued_variable_id)
- variable = getattr(workflow.variables, self.int_catalogued_variable_id)
-
- def createWorklist(self):
- workflow = self.getWorkflowTool()[self.checked_workflow]
- worklists = workflow.worklists
-
+ def addWorkflowCataloguedVariable(self, workflow_id, variable_id):
+ variables = self.getWorkflowTool()[workflow_id].variables
+ variables.addVariable(variable_id)
+ assert variables[variable_id].for_catalog == 1
+
+ def createWorklist(self, workflow_id, worklist_id, actbox_name, **kw):
+ worklists = self.getWorkflowTool()[workflow_id].worklists
+ worklists.addWorklist(worklist_id)
+ worklists._getOb(worklist_id).setProperties('',
+ actbox_name='%s (%%(count)s)' % actbox_name, props=dict(
+ (k.startswith('guard_') and k or 'var_match_'+k, v)
+ for k, v in kw.iteritems()))
+
+ def createWorklists(self):
for worklist_id, actbox_name, role, expr, state, int_variable in [
(self.worklist_assignor_id, self.actbox_assignor_name,
'Assignor', None, self.checked_validation_state, None),
@@ -181,21 +179,22 @@
(self.worklist_int_variable_id, self.actbox_int_variable_name,
None, None, None, str(self.int_value)),
]:
- worklists.addWorklist(worklist_id)
- worklist_definition = worklists._getOb(worklist_id)
- worklist_definition.setProperties('',
- actbox_name='%s (%%(count)s)' % (actbox_name, ),
- props={'guard_roles': role,
- 'var_match_portal_type': self.checked_portal_type,
- 'var_match_validation_state': state,
- # Variable value is saved as string in worklist
- 'var_match_%s' % self.int_catalogued_variable_id: \
- int_variable,
- 'guard_expr': expr})
+ self.createWorklist(self.checked_workflow, worklist_id, actbox_name,
+ guard_roles=role, guard_expr=expr,
+ portal_type=self.checked_portal_type,
+ validation_state=state,
+ **{self.int_catalogued_variable_id: int_variable})
def clearCache(self):
self.portal.portal_caches.clearAllCache()
+ def checkWorklist(self, result, name, count):
+ entry_list = [x for x in result if x['name'].startswith(name)]
+ self.assertEquals(len(entry_list), count and 1)
+ if count:
+ self.assertEquals(count,
+ self.getWorklistDocumentCountFromActionName(entry_list[0]['name']))
+
def test_01_worklist(self, quiet=0, run=run_all_test):
"""
Test the permission of the building module.
@@ -205,13 +204,14 @@
workflow_tool = self.portal.portal_workflow
- self.logMessage("Create Manager")
+ self.logMessage("Create users")
self.createManagerAndLogin()
self.createUsers()
- self.logMessage("Create worklist")
+ self.logMessage("Create worklists")
self.associatePropertySheet()
- self.addWorkflowCataloguedVariable()
- self.createWorklist()
+ self.addWorkflowCataloguedVariable(self.checked_workflow,
+ self.int_catalogued_variable_id)
+ self.createWorklists()
self.logMessage("Create document as Manager")
document = self.createDocument()
@@ -220,156 +220,43 @@
self.clearCache()
result = workflow_tool.listActions(object=document)
- self.logout()
# Users can not see worklist as they are not Assignor
for user_id in ('manager', ):
self.login(user_id)
result = workflow_tool.listActions(object=document)
self.logMessage("Check %s worklist as Assignor" % user_id)
- entry_list = [x for x in result \
- if x['name'].startswith(self.actbox_assignor_name)]
- self.assertEquals(len(entry_list), 0)
+ self.checkWorklist(result, self.actbox_assignor_name, 0)
self.logMessage("Check %s worklist as Owner" % user_id)
- entry_list = [x for x in result \
- if x['name'].startswith(self.actbox_owner_name)]
- self.assertEquals(len(entry_list), 1)
- self.assertEquals(
- self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
- self.logout()
+ self.checkWorklist(result, self.actbox_owner_name, 1)
for user_id in ('foo', 'bar'):
self.logMessage("Check %s worklist" % user_id)
self.login(user_id)
result = workflow_tool.listActions(object=document)
self.assertEquals(result, [])
- self.logout()
-
- # Define foo as Assignor
- self.login('manager')
- self.logMessage("Give foo Assignor role")
- document.manage_addLocalRoles('foo', ['Assignor'])
- self.logMessage("Give manager Assignor role")
- document.manage_addLocalRoles('manager', ['Assignor'])
- document.reindexObject()
- get_transaction().commit()
- self.tic()
- self.clearCache()
- self.logout()
-
- for user_id in ('manager', ):
- self.login(user_id)
- result = workflow_tool.listActions(object=document)
- self.logMessage("Check %s worklist as Assignor" % user_id)
- entry_list = [x for x in result \
- if x['name'].startswith(self.actbox_assignor_name)]
- self.assertEquals(len(entry_list), 1)
- self.assertEquals(
- self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
- self.logMessage("Check %s worklist as Owner" % user_id)
- entry_list = [x for x in result \
- if x['name'].startswith(self.actbox_owner_name)]
- self.assertEquals(len(entry_list), 1)
- self.assertEquals(
- self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
- self.logMessage("Check %s worklist as Owner and Assignor" % user_id)
- entry_list = [x for x in result \
- if x['name'].startswith(self.actbox_assignor_owner_name)]
- self.assertEquals(len(entry_list), 1)
- self.assertEquals(
- self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
- self.logout()
- for user_id in ('bar', ):
- self.login(user_id)
- result = workflow_tool.listActions(object=document)
- self.logMessage("Check %s worklist as Assignor" % user_id)
- entry_list = [x for x in result \
- if x['name'].startswith(self.actbox_assignor_name)]
- self.assertEquals(len(entry_list), 0)
- self.logMessage("Check %s worklist as Owner" % user_id)
- entry_list = [x for x in result \
- if x['name'].startswith(self.actbox_owner_name)]
- self.assertEquals(len(entry_list), 0)
- self.logMessage("Check %s worklist as Owner and Assignor" % user_id)
- entry_list = [x for x in result \
- if x['name'].startswith(self.actbox_assignor_owner_name)]
- self.assertEquals(len(entry_list), 0)
- self.logout()
- for user_id in ('foo', ):
- self.login(user_id)
- result = workflow_tool.listActions(object=document)
- self.logMessage("Check %s worklist as Assignor" % user_id)
- entry_list = [x for x in result \
- if x['name'].startswith(self.actbox_assignor_name)]
- self.assertEquals(len(entry_list), 1)
- self.assertTrue(
- self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
- self.logMessage("Check %s worklist as Owner" % user_id)
- entry_list = [x for x in result \
- if x['name'].startswith(self.actbox_owner_name)]
- self.assertEquals(len(entry_list), 0)
- self.logMessage("Check %s worklist as Owner and Assignor" % user_id)
- entry_list = [x for x in result \
- if x['name'].startswith(self.actbox_assignor_owner_name)]
- self.assertEquals(len(entry_list), 1)
- self.assertEquals(
- self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
- self.logout()
-
- # Define foo and bar as Assignee
- self.login('manager')
- self.logMessage("Give foo Assignee role")
- document.manage_addLocalRoles('foo', ['Assignee'])
- self.logMessage("Give bar Assignee role")
- document.manage_addLocalRoles('bar', ['Assignee'])
- document.reindexObject()
- get_transaction().commit()
- self.tic()
- self.clearCache()
- self.logout()
-
- # Users can not see worklist as they are not Assignor
- for user_id in ('manager', ):
- self.login(user_id)
- result = workflow_tool.listActions(object=document)
- self.logMessage("Check %s worklist as Assignor" % user_id)
- entry_list = [x for x in result \
- if x['name'].startswith(self.actbox_assignor_name)]
- self.assertEquals(len(entry_list), 1)
- self.assertTrue(
- self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
- self.logMessage("Check %s worklist as Owner" % user_id)
- entry_list = [x for x in result \
- if x['name'].startswith(self.actbox_owner_name)]
- self.assertEquals(len(entry_list), 1)
- self.assertTrue(
- self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
- self.logout()
- for user_id in ('bar', ):
- self.login(user_id)
- result = workflow_tool.listActions(object=document)
- self.logMessage("Check %s worklist as Assignor" % user_id)
- entry_list = [x for x in result \
- if x['name'].startswith(self.actbox_assignor_name)]
- self.assertEquals(len(entry_list), 0)
- self.logMessage("Check %s worklist as Owner" % user_id)
- entry_list = [x for x in result \
- if x['name'].startswith(self.actbox_owner_name)]
- self.assertEquals(len(entry_list), 0)
- self.logout()
- for user_id in ('foo', ):
- self.login(user_id)
- result = workflow_tool.listActions(object=document)
- self.logMessage("Check %s worklist as Assignor" % user_id)
- entry_list = [x for x in result \
- if x['name'].startswith(self.actbox_assignor_name)]
- self.assertEquals(len(entry_list), 1)
- self.assertTrue(
- self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
- self.logMessage("Check %s worklist as Owner" % user_id)
- entry_list = [x for x in result \
- if x['name'].startswith(self.actbox_owner_name)]
- self.assertEquals(len(entry_list), 0)
- self.logout()
+
+ for role, user_id_list in (('Assignor', ('foo', 'manager')),
+ ('Assignee', ('foo', 'bar'))):
+ self.login('manager')
+ for user_id in user_id_list:
+ self.logMessage("Give %s %s role" % (user_id, role))
+ document.manage_addLocalRoles(user_id, [role])
+ document.reindexObject()
+ get_transaction().commit()
+ self.tic()
+ self.clearCache()
+
+ for user_id, assignor, owner, both in (('manager', 1, 1, 1),
+ ('bar' , 0, 0, 0),
+ ('foo' , 1, 0, 1)):
+ self.login(user_id)
+ result = workflow_tool.listActions(object=document)
+ self.logMessage(" Check %s worklist as Assignor" % user_id)
+ self.checkWorklist(result, self.actbox_assignor_name, assignor)
+ self.logMessage(" Check %s worklist as Owner" % user_id)
+ self.checkWorklist(result, self.actbox_owner_name, owner)
+ self.logMessage(" Check %s worklist as Owner and Assignor" % user_id)
+ self.checkWorklist(result, self.actbox_assignor_owner_name, both)
# Check if int variable are managed by the worklist
user_id = 'manager'
@@ -377,9 +264,7 @@
result = workflow_tool.listActions(object=document)
self.logMessage("Check %s worklist with int value as %s" % \
(user_id, self.int_value))
- entry_list = [x for x in result \
- if x['name'].startswith(self.actbox_int_variable_name)]
- self.assertEquals(len(entry_list), 1)
+ self.checkWorklist(result, self.actbox_int_variable_name, 1)
# Change int value on document
new_value = self.int_value + 1
@@ -391,11 +276,7 @@
result = workflow_tool.listActions(object=document)
self.logMessage("Check %s worklist with int value as %s" % \
(user_id, new_value))
- entry_list = [x for x in result \
- if x['name'].startswith(self.actbox_int_variable_name)]
- self.assertEquals(len(entry_list), 0)
-
- self.logout()
+ self.checkWorklist(result, self.actbox_int_variable_name, 0)
#
# Check monovalued security role
@@ -413,65 +294,27 @@
bar_assignee_document.manage_setLocalRoles('manager', ['Assignee'])
bar_assignee_document.manage_permission('View', ['Owner', 'Assignee'], 0)
- self.logout()
- self.login('manager')
+ user_id = 'manager'
+ self.login(user_id)
module.manage_delLocalRoles('bar')
- # User can not see worklist as user can not view the document
- document.manage_setLocalRoles('manager', ['Owner', 'Assignee'])
- document.manage_permission('View', [], 0)
- document.reindexObject()
- get_transaction().commit()
- self.tic()
- self.clearCache()
- self.logout()
-
- for user_id in ('manager', ):
- self.login(user_id)
- result = workflow_tool.listActions(object=document)
- self.logMessage("Check %s worklist as Owner" % user_id)
- entry_list = [x for x in result \
- if x['name'].startswith(self.actbox_owner_name)]
- self.assertEquals(len(entry_list), 0)
- self.logout()
-
- # User can not see worklist as Owner can not view the document
- document.manage_setLocalRoles('manager', ['Owner', 'Assignee'])
- document.manage_permission('View', ['Assignee'], 0)
- document.reindexObject()
- get_transaction().commit()
- self.tic()
- self.clearCache()
- self.logout()
-
- for user_id in ('manager', ):
- self.login(user_id)
- result = workflow_tool.listActions(object=document)
- self.logMessage("Check %s worklist as Owner" % user_id)
- entry_list = [x for x in result \
- if x['name'].startswith(self.actbox_owner_name)]
- self.assertEquals(len(entry_list), 0)
- self.logout()
-
- # User can see worklist as Owner can view the document
- document.manage_permission('View', ['Owner', 'Assignee'], 0)
- document.reindexObject()
- get_transaction().commit()
- self.tic()
- self.clearCache()
- self.logout()
-
- for user_id in ('manager', ):
- self.login(user_id)
- result = workflow_tool.listActions(object=document)
- self.logMessage("Check %s worklist as Owner" % user_id)
- entry_list = [x for x in result \
- if x['name'].startswith(self.actbox_owner_name)]
- self.assertEquals(len(entry_list), 1)
- self.assertTrue(
- self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
- self.logout()
+ def test(*count_list):
+ local_role_list = 'Assignee', 'Owner'
+ document.manage_setLocalRoles('manager', local_role_list)
+
+ for i, count in enumerate(count_list):
+ document.manage_permission('View', local_role_list[:i], 0)
+ document.reindexObject()
+ get_transaction().commit()
+ self.tic()
+ self.clearCache()
+
+ result = workflow_tool.listActions(object=document)
+ self.logMessage("Check %s worklist as Owner (%s)" % (user_id, count))
+ self.checkWorklist(result, self.actbox_owner_name, count)
+
+ test(0, 0, 1)
# Define a local role key
sql_catalog = self.portal.portal_catalog.getSQLCatalog()
@@ -482,67 +325,56 @@
self.portal.portal_caches.clearAllCache()
try:
-
- # User can not see worklist as user can not view the document
- document.manage_setLocalRoles('manager', ['Owner', 'Assignee'])
- document.manage_permission('View', [], 0)
- document.reindexObject()
- get_transaction().commit()
- self.tic()
- self.clearCache()
- self.logout()
-
- for user_id in ('manager', ):
- self.login(user_id)
- result = workflow_tool.listActions(object=document)
- self.logMessage("Check %s worklist as Owner" % user_id)
- entry_list = [x for x in result \
- if x['name'].startswith(self.actbox_owner_name)]
- self.assertEquals(len(entry_list), 0)
- self.logout()
-
- # User can see worklist as Assignee can view the document
- document.manage_permission('View', ['Assignee'], 0)
- document.reindexObject()
- get_transaction().commit()
- self.tic()
- self.clearCache()
- self.logout()
-
- for user_id in ('manager', ):
- self.login(user_id)
- result = workflow_tool.listActions(object=document)
- self.logMessage("Check %s worklist as Owner" % user_id)
- entry_list = [x for x in result \
- if x['name'].startswith(self.actbox_owner_name)]
- self.assertEquals(len(entry_list), 1)
- self.assertTrue(
- self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
- self.logout()
-
- # User can see worklist as Owner can view the document
- document.manage_permission('View', ['Owner', 'Assignee'], 0)
- document.reindexObject()
- get_transaction().commit()
- self.tic()
- self.clearCache()
- self.logout()
-
- for user_id in ('manager', ):
- self.login(user_id)
- result = workflow_tool.listActions(object=document)
- self.logMessage("Check %s worklist as Owner" % user_id)
- entry_list = [x for x in result \
- if x['name'].startswith(self.actbox_owner_name)]
- self.assertEquals(len(entry_list), 1)
- self.assertTrue(
- self.getWorklistDocumentCountFromActionName(entry_list[0]['name']), 1)
- self.logout()
+ test(0, 1, 1)
finally:
sql_catalog.sql_catalog_local_role_keys = \
current_sql_catalog_local_role_keys
get_transaction().commit()
+ #
+ # Test related keys
+ #
+ self.logMessage("Test related keys")
+ self.addWorkflowCataloguedVariable(self.checked_workflow,
+ 'base_category_id')
+
+ for base_category, category_list in (
+ ('region', ('somewhere', 'elsewhere')),
+ ('role', ('client', 'supplier'))):
+ newContent = self.getCategoryTool()[base_category].newContent
+ for category in category_list:
+ newContent(portal_type='Category', id=category)
+
+ self.createWorklist(self.checked_workflow, 'region_worklist', 'has_region',
+ portal_type=self.checked_portal_type,
+ base_category_id='region')
+ self.createWorklist(self.checked_workflow, 'role_worklist', 'has_role',
+ portal_type=self.checked_portal_type,
+ base_category_id='role')
+
+ get_transaction().commit()
+ self.tic()
+ self.clearCache()
+ self.logMessage(" Check no document has region/role categories defined")
+ result = workflow_tool.listActions(object=document)
+ self.checkWorklist(result, 'has_region', 0)
+ self.checkWorklist(result, 'has_role', 0)
+
+ self.logMessage(" Creates documents with region/role categories defined")
+ self.createDocument(role='client')
+ self.createDocument(region='somewhere')
+ self.createDocument(region='elsewhere')
+
+ get_transaction().commit()
+ self.tic()
+ self.clearCache()
+ self.logMessage(
+ " Check there are documents with region/role categories defined")
+ result = workflow_tool.listActions(object=document)
+ self.checkWorklist(result, 'has_region', 2)
+ self.checkWorklist(result, 'has_role', 1)
+
+
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestWorklist))
Modified: erp5/trunk/products/ERP5Type/patches/WorkflowTool.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5Type/patches/WorkflowTool.py?rev=24770&r1=24769&r2=24770&view=diff
==============================================================================
--- erp5/trunk/products/ERP5Type/patches/WorkflowTool.py [utf8] (original)
+++ erp5/trunk/products/ERP5Type/patches/WorkflowTool.py [utf8] Tue Dec 2 20:03:55 2008
@@ -449,11 +449,12 @@
portal_url = getToolByName(self, 'portal_url')()
portal_catalog = getToolByName(self, 'portal_catalog')
search_result = getattr(self, "Base_getCountFromWorklistTable", None)
- if search_result is None:
+ use_cache = search_result is not None
+ if use_cache:
+ select_expression_prefix = 'sum(`%s`) as %s' % (COUNT_COLUMN_TITLE, COUNT_COLUMN_TITLE)
+ else:
search_result = portal_catalog.unrestrictedSearchResults
select_expression_prefix = 'count(*) as %s' % (COUNT_COLUMN_TITLE, )
- else:
- select_expression_prefix = 'sum(`%s`) as %s' % (COUNT_COLUMN_TITLE, COUNT_COLUMN_TITLE)
getSecurityUidListAndRoleColumnDict = \
portal_catalog.getSecurityUidListAndRoleColumnDict
security_query_cache_dict = {}
@@ -486,7 +487,10 @@
select_expression = [select_expression_prefix]
for criterion_id in total_criterion_id_list:
mapped_key = acceptable_key_dict[criterion_id]
- if isinstance(mapped_key, str): # related key
+ if use_cache: # no support for related keys
+ select_expression.append(criterion_id)
+ continue
+ elif isinstance(mapped_key, str): # related key
mapped_key = mapped_key.split('/')
related_table_map_dict[criterion_id] = table_alias_list = tuple(
(table_id, '%s_%s' % (criterion_id, i))
More information about the Erp5-report
mailing list