[Erp5-report] r17364 - /erp5/trunk/products/ERP5Type/patches/WorkflowTool.py
nobody at svn.erp5.org
nobody at svn.erp5.org
Fri Nov 2 15:03:13 CET 2007
Author: vincent
Date: Fri Nov 2 15:03:12 2007
New Revision: 17364
URL: http://svn.erp5.org?rev=17364&view=rev
Log:
Allow getSecurityUidListAndRoleColumnDict to be None in groupWorklistListByCondition (part 1 of 2)
- update prototype
- add test
- add methods to factorize code
- add method calls in new code branch
- indent existing code branch (to make changes in this branch obvious in next commit)
Modified:
erp5/trunk/products/ERP5Type/patches/WorkflowTool.py
Modified: erp5/trunk/products/ERP5Type/patches/WorkflowTool.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5Type/patches/WorkflowTool.py?rev=17364&r1=17363&r2=17364&view=diff
==============================================================================
--- erp5/trunk/products/ERP5Type/patches/WorkflowTool.py (original)
+++ erp5/trunk/products/ERP5Type/patches/WorkflowTool.py Fri Nov 2 15:03:12 2007
@@ -95,8 +95,38 @@
"""
pass
+def getValidCriterionDict(worklist_match_dict, acceptable_key_dict):
+ valid_criterion_dict = {}
+ metadata = None
+ for criterion_id, criterion_value in worklist_match_dict.iteritems():
+ if criterion_id in acceptable_key_dict:
+ if isinstance(criterion_value, tuple):
+ criterion_value = list(criterion_value)
+ assert criterion_id not in valid_criterion_dict
+ valid_criterion_dict[criterion_id] = criterion_value
+ elif criterion_id == WORKLIST_METADATA_KEY:
+ metadata = criterion_value
+ elif criterion_id == SECURITY_PARAMETER_ID:
+ pass
+ else:
+ LOG('WorkflowTool_listActions', WARNING, 'Worklist %s of ' \
+ 'workflow %s filters on variable %s which is not available ' \
+ 'in catalog. Its value will not be checked.' % \
+ (worklist_id, workflow_id, criterion_id))
+ return valid_criterion_dict, metadata
+
+def updateWorklistSetDict(worklist_set_dict, workflow_worklist_key, valid_criterion_dict):
+ worklist_set_dict_key = valid_criterion_dict.keys()
+ if len(worklist_set_dict_key):
+ worklist_set_dict_key.sort()
+ worklist_set_dict_key = tuple(worklist_set_dict_key)
+ if worklist_set_dict_key not in worklist_set_dict:
+ worklist_set_dict[worklist_set_dict_key] = {}
+ worklist_set_dict[worklist_set_dict_key]\
+ [workflow_worklist_key] = valid_criterion_dict
+
def groupWorklistListByCondition(worklist_dict, acceptable_key_dict,
- getSecurityUidListAndRoleColumnDict):
+ getSecurityUidListAndRoleColumnDict=None):
"""
Get a list of dict of WorklistVariableMatchDict grouped by compatible
conditions.
@@ -133,56 +163,67 @@
for workflow_id, worklist in worklist_dict.iteritems():
for worklist_id, worklist_match_dict in worklist.iteritems():
workflow_worklist_key = '/'.join((workflow_id, worklist_id))
- security_parameter = worklist_match_dict.get(SECURITY_PARAMETER_ID, [])
- security_kw = {}
- if len(security_parameter):
- security_kw[SECURITY_PARAMETER_ID] = security_parameter
- uid_list, role_column_dict = getSecurityUidListAndRoleColumnDict(
- **security_kw)
- if len(uid_list):
- uid_list.sort()
- role_column_dict[SECURITY_COLUMN_ID] = uid_list
- # Make sure every item is a list - or a tuple
- for security_column_id in role_column_dict.iterkeys():
- value = role_column_dict[security_column_id]
- if not isinstance(value, (tuple, list)):
- role_column_dict[security_column_id] = [value]
- applied_security_criterion_dict = {}
- # TODO: make security criterions be examined in the same order for all
- # worklists if possible at all.
- for security_column_id, security_column_value in \
- role_column_dict.iteritems():
- valid_criterion_dict = {}
- valid_criterion_dict.update(applied_security_criterion_dict)
- # Current security criterion must be applied to all further queries
- # for this worklist negated, so the a given line cannot match multiple
- # times.
- applied_security_criterion_dict[security_column_id] = \
- ExclusionList(security_column_value)
- valid_criterion_dict[security_column_id] = security_column_value
- for criterion_id, criterion_value in worklist_match_dict.iteritems():
- if criterion_id in acceptable_key_dict:
- if isinstance(criterion_value, tuple):
- criterion_value = list(criterion_value)
- assert criterion_id not in valid_criterion_dict
- valid_criterion_dict[criterion_id] = criterion_value
- elif criterion_id == WORKLIST_METADATA_KEY:
- metadata_dict[workflow_worklist_key] = criterion_value
- elif criterion_id == SECURITY_PARAMETER_ID:
- pass
- else:
- LOG('WorkflowTool_listActions', WARNING, 'Worklist %s of ' \
- 'workflow %s filters on variable %s which is not available ' \
- 'in catalog. Its value will not be checked.' % \
- (worklist_id, workflow_id, criterion_id))
- worklist_set_dict_key = valid_criterion_dict.keys()
- if len(worklist_set_dict_key):
- worklist_set_dict_key.sort()
- worklist_set_dict_key = tuple(worklist_set_dict_key)
- if worklist_set_dict_key not in worklist_set_dict:
- worklist_set_dict[worklist_set_dict_key] = {}
- worklist_set_dict[worklist_set_dict_key]\
- [workflow_worklist_key] = valid_criterion_dict
+ if getSecurityUidListAndRoleColumnDict is None:
+ valid_criterion_dict, metadata = getValidCriterionDict(
+ worklist_match_dict=worklist_match_dict,
+ acceptable_key_dict=acceptable_key_dict)
+ if metadata is not None:
+ metadata_dict[workflow_worklist_key] = metadata
+ updateWorklistSetDict(
+ worklist_set_dict=worklist_set_dict,
+ workflow_worklist_key=workflow_worklist_key,
+ valid_criterion_dict=valid_criterion_dict)
+ else:
+ security_parameter = worklist_match_dict.get(SECURITY_PARAMETER_ID, [])
+ security_kw = {}
+ if len(security_parameter):
+ security_kw[SECURITY_PARAMETER_ID] = security_parameter
+ uid_list, role_column_dict = getSecurityUidListAndRoleColumnDict(
+ **security_kw)
+ if len(uid_list):
+ uid_list.sort()
+ role_column_dict[SECURITY_COLUMN_ID] = uid_list
+ # Make sure every item is a list - or a tuple
+ for security_column_id in role_column_dict.iterkeys():
+ value = role_column_dict[security_column_id]
+ if not isinstance(value, (tuple, list)):
+ role_column_dict[security_column_id] = [value]
+ applied_security_criterion_dict = {}
+ # TODO: make security criterions be examined in the same order for all
+ # worklists if possible at all.
+ for security_column_id, security_column_value in \
+ role_column_dict.iteritems():
+ valid_criterion_dict = {}
+ valid_criterion_dict.update(applied_security_criterion_dict)
+ # Current security criterion must be applied to all further queries
+ # for this worklist negated, so the a given line cannot match multiple
+ # times.
+ applied_security_criterion_dict[security_column_id] = \
+ ExclusionList(security_column_value)
+ valid_criterion_dict[security_column_id] = security_column_value
+ for criterion_id, criterion_value in worklist_match_dict.iteritems():
+ if criterion_id in acceptable_key_dict:
+ if isinstance(criterion_value, tuple):
+ criterion_value = list(criterion_value)
+ assert criterion_id not in valid_criterion_dict
+ valid_criterion_dict[criterion_id] = criterion_value
+ elif criterion_id == WORKLIST_METADATA_KEY:
+ metadata_dict[workflow_worklist_key] = criterion_value
+ elif criterion_id == SECURITY_PARAMETER_ID:
+ pass
+ else:
+ LOG('WorkflowTool_listActions', WARNING, 'Worklist %s of ' \
+ 'workflow %s filters on variable %s which is not available ' \
+ 'in catalog. Its value will not be checked.' % \
+ (worklist_id, workflow_id, criterion_id))
+ worklist_set_dict_key = valid_criterion_dict.keys()
+ if len(worklist_set_dict_key):
+ worklist_set_dict_key.sort()
+ worklist_set_dict_key = tuple(worklist_set_dict_key)
+ if worklist_set_dict_key not in worklist_set_dict:
+ worklist_set_dict[worklist_set_dict_key] = {}
+ worklist_set_dict[worklist_set_dict_key]\
+ [workflow_worklist_key] = valid_criterion_dict
return worklist_set_dict.values(), metadata_dict
def generateNestedQuery(priority_list, criterion_dict,
More information about the Erp5-report
mailing list