[Erp5-report] r25449 - in /erp5/trunk/products/ERP5Catalog: ./ tests/
nobody at svn.erp5.org
nobody at svn.erp5.org
Wed Feb 4 17:47:16 CET 2009
Author: romain
Date: Wed Feb 4 17:47:14 2009
New Revision: 25449
URL: http://svn.erp5.org?rev=25449&view=rev
Log:
Allow other roles than Owner to be catalogued in a monovalued column.
Monovalued column only index user ID, and no security group.
If some security groups are assigned to such role, they are still catalogued in
the roles_and_users table.
Modified:
erp5/trunk/products/ERP5Catalog/CatalogTool.py
erp5/trunk/products/ERP5Catalog/tests/testERP5Catalog.py
Modified: erp5/trunk/products/ERP5Catalog/CatalogTool.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5Catalog/CatalogTool.py?rev=25449&r1=25448&r2=25449&view=diff
==============================================================================
--- erp5/trunk/products/ERP5Catalog/CatalogTool.py [utf8] (original)
+++ erp5/trunk/products/ERP5Catalog/CatalogTool.py [utf8] Wed Feb 4 17:47:14 2009
@@ -103,17 +103,9 @@
else:
self.__dict__[name] = value
- def allowedRolesAndUsers(self):
- """
- Return a list of roles and users with View permission.
- Used by Portal Catalog to filter out items you're not allowed to see.
-
- WARNING (XXX): some user base local role association is currently
- being stored (ex. to be determined). This should be prevented or it will
- make the table explode. To analyse the symptoms, look at the
- user_and_roles table. You will find some user:foo values
- which are not necessary.
- """
+ def _getSecurityParameterList(self):
+ result_key = '_cache_result'
+ if result_key not in self.__dict__:
ob = self.__ob
security_product = getSecurityProduct(ob.acl_users)
withnuxgroups = security_product == SECURITY_USING_NUX_USER_GROUPS
@@ -147,6 +139,12 @@
if len(new_role_list)>0:
flat_localroles[key] = new_role_list
localroles = flat_localroles
+
+ portal = self.getPortalObject()
+ role_dict = dict(portal.portal_catalog.getSQLCatalog().\
+ getSQLCatalogRoleKeysList())
+ getUserById = portal.acl_users.getUserById
+
# For each local role of a user:
# If the local role grants View permission, add it.
# Every addition implies 2 lines:
@@ -154,18 +152,72 @@
# user:<user_id>:<role_id>
# A line must not be present twice in final result.
allowed = sets.Set(rolesForPermissionOn('View', ob))
+ # XXX Owner is hardcoded, in order to prevent searching for user on the
+ # site root.
allowed.discard('Owner')
add = allowed.add
+ user_role_dict = {}
+ user_view_permission_role_dict = {}
for user, roles in localroles.iteritems():
if withnuxgroups:
prefix = user
else:
prefix = 'user:' + user
for role in roles:
- if role in allowed:
+ if (role in role_dict) and (getUserById(user) is not None):
+ # If role is monovalued, check if key is a user.
+ # If not, continue to index it in roles_and_users table.
+ user_role_dict[role] = user
+ if role in allowed:
+ user_view_permission_role_dict[role] = user
+ elif role in allowed:
add(prefix)
add(prefix + ':' + role)
- return list(allowed)
+
+ # __setattr__ explicitely set the parameter on the wrapper
+ setattr(self, result_key,
+ (list(allowed), user_role_dict,
+ user_view_permission_role_dict))
+
+ # Return expected value
+ return self.__dict__[result_key]
+
+ def allowedRolesAndUsers(self):
+ """
+ Return a list of roles and users with View permission.
+ Used by Portal Catalog to filter out items you're not allowed to see.
+
+ WARNING (XXX): some user base local role association is currently
+ being stored (ex. to be determined). This should be prevented or it will
+ make the table explode. To analyse the symptoms, look at the
+ user_and_roles table. You will find some user:foo values
+ which are not necessary.
+ """
+ return self._getSecurityParameterList()[0]
+
+ def getAssignee(self):
+ """Returns the user ID of the user with 'Assignee' local role on this
+ document.
+
+ If there is more than one Assignee local role, the result is undefined.
+ """
+ return self._getSecurityParameterList()[1].get('Assignee', None)
+
+ def getViewPermissionAssignee(self):
+ """Returns the user ID of the user with 'Assignee' local role on this
+ document, if the Assignee role has View permission.
+
+ If there is more than one Assignee local role, the result is undefined.
+ """
+ return self._getSecurityParameterList()[2].get('Assignee', None)
+
+ def getViewPermissionAssignor(self):
+ """Returns the user ID of the user with 'Assignor' local role on this
+ document, if the Assignor role has View permission.
+
+ If there is more than one Assignor local role, the result is undefined.
+ """
+ return self._getSecurityParameterList()[2].get('Assignor', None)
def __repr__(self):
return '<Products.ERP5Catalog.CatalogTool.IndexableObjectWrapper'\
@@ -467,65 +519,63 @@
catalog = self.getSQLCatalog(sql_catalog_id)
column_map = catalog.getColumnMap()
+ # We only consider here the Owner role (since it was not indexed)
+ # since some objects may only be visible by their owner
+ # which was not indexed
+ for role, column_id in catalog.getSQLCatalogRoleKeysList():
+ # XXX This should be a list
+ if not user_is_superuser:
+ try:
+ # if called by an executable with proxy roles, we don't use
+ # owner, but only roles from the proxy.
+ eo = getSecurityManager()._context.stack[-1]
+ proxy_roles = getattr(eo, '_proxy_roles', None)
+ if not proxy_roles:
+ role_column_dict[column_id] = user_str
+ except IndexError:
+ role_column_dict[column_id] = user_str
+
# Patch for ERP5 by JP Smets in order
# to implement worklists and search of local roles
- if kw.has_key('local_roles'):
- local_roles = kw['local_roles']
+ local_roles = kw.get('local_roles', None)
+ if local_roles:
local_role_dict = dict(catalog.getSQLCatalogLocalRoleKeysList())
role_dict = dict(catalog.getSQLCatalogRoleKeysList())
# XXX user is not enough - we should also include groups of the user
- # Only consider local_roles if it is not empty
- if local_roles not in (None, '', []): # XXX: Maybe "if local_roles:" is enough.
- new_allowedRolesAndUsers = []
- # Turn it into a list if necessary according to ';' separator
- if isinstance(local_roles, str):
- local_roles = local_roles.split(';')
- # Local roles now has precedence (since it comes from a WorkList)
- for user_or_group in allowedRolesAndUsers:
- for role in local_roles:
- # Performance optimisation
- if local_role_dict.has_key(role):
+ new_allowedRolesAndUsers = []
+ new_role_column_dict = {}
+ # Turn it into a list if necessary according to ';' separator
+ if isinstance(local_roles, str):
+ local_roles = local_roles.split(';')
+ # Local roles now has precedence (since it comes from a WorkList)
+ for user_or_group in allowedRolesAndUsers:
+ for role in local_roles:
+ # Performance optimisation
+ if local_role_dict.has_key(role):
+ # XXX This should be a list
+ # If a given role exists as a column in the catalog,
+ # then it is considered as single valued and indexed
+ # through the catalog.
+ if not user_is_superuser:
# XXX This should be a list
- # If a given role exists as a column in the catalog,
- # then it is considered as single valued and indexed
- # through the catalog.
- if not user_is_superuser:
- # XXX This should be a list
- # which also includes all user groups
- column_id = local_role_dict[role]
- local_role_column_dict[column_id] = user_str
- if role_dict.has_key(role):
+ # which also includes all user groups
+ column_id = local_role_dict[role]
+ local_role_column_dict[column_id] = user_str
+ if role_dict.has_key(role):
+ # XXX This should be a list
+ # If a given role exists as a column in the catalog,
+ # then it is considered as single valued and indexed
+ # through the catalog.
+ if not user_is_superuser:
# XXX This should be a list
- # If a given role exists as a column in the catalog,
- # then it is considered as single valued and indexed
- # through the catalog.
- if not user_is_superuser:
- # XXX This should be a list
- # which also includes all user groups
- column_id = role_dict[role]
- role_column_dict[column_id] = user_str
- else:
- # Else, we use the standard approach
- new_allowedRolesAndUsers.append('%s:%s' % (user_or_group, role))
- if local_role_column_dict == {}:
- allowedRolesAndUsers = new_allowedRolesAndUsers
-
- else:
- # We only consider here the Owner role (since it was not indexed)
- # since some objects may only be visible by their owner
- # which was not indexed
- for role, column_id in catalog.getSQLCatalogRoleKeysList():
- # XXX This should be a list
- if not user_is_superuser:
- try:
- # if called by an executable with proxy roles, we don't use
- # owner, but only roles from the proxy.
- eo = getSecurityManager()._context.stack[-1]
- proxy_roles = getattr(eo, '_proxy_roles', None)
- if not proxy_roles:
- role_column_dict[column_id] = user_str
- except IndexError:
- role_column_dict[column_id] = user_str
+ # which also includes all user groups
+ column_id = role_dict[role]
+ new_role_column_dict[column_id] = user_str
+ new_allowedRolesAndUsers.append('%s:%s' % (user_or_group, role))
+ if local_role_column_dict == {}:
+ allowedRolesAndUsers = new_allowedRolesAndUsers
+ role_column_dict = new_role_column_dict
+
return allowedRolesAndUsers, role_column_dict, local_role_column_dict
Modified: erp5/trunk/products/ERP5Catalog/tests/testERP5Catalog.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5Catalog/tests/testERP5Catalog.py?rev=25449&r1=25448&r2=25449&view=diff
==============================================================================
--- erp5/trunk/products/ERP5Catalog/tests/testERP5Catalog.py [utf8] (original)
+++ erp5/trunk/products/ERP5Catalog/tests/testERP5Catalog.py [utf8] Wed Feb 4 17:47:14 2009
@@ -2286,6 +2286,17 @@
sql_connection = self.getSQLConnection()
portal = self.getPortalObject()
portal_types = portal.portal_types
+
+ uf = self.getPortal().acl_users
+ uf._doAddUser('foo', 'foo', ['Member', ], [])
+ uf._doAddUser('ERP5TypeTestCase', 'ERP5TypeTestCase', ['Member', ], [])
+ get_transaction().commit()
+ portal_catalog = self.getCatalogTool()
+ portal_catalog.manage_catalogClear()
+ self.getPortal().ERP5Site_reindexAll()
+ get_transaction().commit()
+ self.tic()
+
# Person stuff
person_module = portal.person_module
@@ -2589,6 +2600,710 @@
current_sql_clear_catalog
sql_catalog.sql_catalog_local_role_keys = \
current_sql_catalog_local_role_keys
+ sql_catalog.sql_search_tables = current_sql_search_tables
+ get_transaction().commit()
+
+ def test_MonoValueAssigneeIndexing(self, quiet=quiet, run=run_all_test):
+ if not run:
+ return
+
+ login = PortalTestCase.login
+ logout = self.logout
+ user1 = 'local_foo'
+ user2 = 'local_bar'
+ uf = self.getPortal().acl_users
+ uf._doAddUser(user1, user1, ['Member', ], [])
+ uf._doAddUser(user2, user2, ['Member', ], [])
+
+ perm = 'View'
+ folder = self.getOrganisationModule()
+ folder.manage_setLocalRoles(user1, ['Author', 'Auditor'])
+ folder.manage_setLocalRoles(user2, ['Author', 'Auditor'])
+ portal_type = 'Organisation'
+
+ sql_connection = self.getSQLConnection()
+
+ login(self, user2)
+ obj2 = folder.newContent(portal_type=portal_type)
+ obj2.manage_setLocalRoles(user1, ['Auditor'])
+ obj2.manage_permission(perm, ['Assignee', 'Auditor'], 0)
+
+ login(self, user1)
+
+ obj = folder.newContent(portal_type=portal_type)
+ obj.manage_setLocalRoles(user1, ['Assignee', 'Auditor'])
+
+ # Check that nothing is returned when user can not view the object
+ obj.manage_permission(perm, [], 0)
+ obj.reindexObject()
+ get_transaction().commit()
+ self.tic()
+ result = obj.portal_catalog(portal_type=portal_type)
+ self.assertSameSet([obj2, ], [x.getObject() for x in result])
+ result = obj.portal_catalog(portal_type=portal_type, local_roles='Assignee')
+ self.assertSameSet([], [x.getObject() for x in result])
+ result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
+ self.assertSameSet([obj2, ], [x.getObject() for x in result])
+
+ # Check that object is returned when he can view the object
+ obj.manage_permission(perm, ['Auditor'], 0)
+ obj.reindexObject()
+ get_transaction().commit()
+ self.tic()
+ result = obj.portal_catalog(portal_type=portal_type)
+ self.assertSameSet([obj2, obj], [x.getObject() for x in result])
+ result = obj.portal_catalog(portal_type=portal_type, local_roles='Assignee')
+ self.assertSameSet([], [x.getObject() for x in result])
+ result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
+ self.assertSameSet([obj2, obj], [x.getObject() for x in result])
+
+ # Check that object is returned when he can view the object
+ obj.manage_permission(perm, ['Assignee'], 0)
+ obj.reindexObject()
+ get_transaction().commit()
+ self.tic()
+ result = obj.portal_catalog(portal_type=portal_type)
+ self.assertSameSet([obj2, obj], [x.getObject() for x in result])
+ result = obj.portal_catalog(portal_type=portal_type, local_roles='Assignee')
+ self.assertSameSet([obj], [x.getObject() for x in result])
+ result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
+ self.assertSameSet([obj2, ], [x.getObject() for x in result])
+
+ # Add a new table to the catalog
+ sql_catalog = self.portal.portal_catalog.getSQLCatalog()
+
+ local_roles_table = "test_assignee_local_roles"
+
+ create_local_role_table_sql = """
+CREATE TABLE `%s` (
+ `uid` BIGINT UNSIGNED NOT NULL,
+ `assignee_reference` varchar(32) NOT NULL default '',
+ `viewable_assignee_reference` varchar(32) NOT NULL default '',
+ PRIMARY KEY (`uid`),
+ KEY `assignee_reference` (`assignee_reference`),
+ KEY `viewable_assignee_reference` (`viewable_assignee_reference`)
+) TYPE=InnoDB;
+ """ % local_roles_table
+ sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
+ id = 'z_create_%s' % local_roles_table,
+ title = '',
+ arguments = "",
+ connection_id = 'erp5_sql_connection',
+ template = create_local_role_table_sql)
+
+ drop_local_role_table_sql = """
+DROP TABLE IF EXISTS %s
+ """ % local_roles_table
+ sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
+ id = 'z0_drop_%s' % local_roles_table,
+ title = '',
+ arguments = "",
+ connection_id = 'erp5_sql_connection',
+ template = drop_local_role_table_sql)
+
+ catalog_local_role_sql = """
+REPLACE INTO
+ %s
+VALUES
+<dtml-in prefix="loop" expr="_.range(_.len(uid))">
+(
+ <dtml-sqlvar expr="uid[loop_item]" type="int">,
+ <dtml-sqlvar expr="getAssignee[loop_item] or ''" type="string" optional>,
+ <dtml-sqlvar expr="getViewPermissionAssignee[loop_item] or ''" type="string" optional>
+)
+<dtml-if sequence-end>
+<dtml-else>
+,
+</dtml-if>
+</dtml-in>
+ """ % local_roles_table
+ sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
+ id = 'z_catalog_%s_list' % local_roles_table,
+ title = '',
+ connection_id = 'erp5_sql_connection',
+ arguments = "\n".join(['uid',
+ 'getAssignee',
+ 'getViewPermissionAssignee']),
+ template = catalog_local_role_sql)
+
+ get_transaction().commit()
+ current_sql_catalog_object_list = sql_catalog.sql_catalog_object_list
+ sql_catalog.sql_catalog_object_list = \
+ current_sql_catalog_object_list + \
+ ('z_catalog_%s_list' % local_roles_table,)
+ current_sql_clear_catalog = sql_catalog.sql_clear_catalog
+ sql_catalog.sql_clear_catalog = \
+ current_sql_clear_catalog + \
+ ('z0_drop_%s' % local_roles_table, 'z_create_%s' % local_roles_table)
+ current_sql_catalog_local_role_keys = \
+ sql_catalog.sql_catalog_local_role_keys
+ sql_catalog.sql_catalog_local_role_keys = ('Assignee | %s.assignee_reference' % \
+ local_roles_table,)
+
+ current_sql_catalog_role_keys = \
+ sql_catalog.sql_catalog_role_keys
+ sql_catalog.sql_catalog_role_keys = (
+ 'Assignee | %s.viewable_assignee_reference' % \
+ local_roles_table,)
+
+ current_sql_search_tables = sql_catalog.sql_search_tables
+ sql_catalog.sql_search_tables = sql_catalog.sql_search_tables + \
+ [local_roles_table]
+ get_transaction().commit()
+
+ try:
+ # Clear catalog
+ portal_catalog = self.getCatalogTool()
+ portal_catalog.manage_catalogClear()
+ get_transaction().commit()
+ self.portal.portal_caches.clearAllCache()
+ get_transaction().commit()
+ obj2.reindexObject()
+
+ # Check that nothing is returned when user can not view the object
+ obj.manage_permission(perm, [], 0)
+ obj.reindexObject()
+ get_transaction().commit()
+ self.tic()
+ result = obj.portal_catalog(portal_type=portal_type)
+ self.assertSameSet([obj2, ], [x.getObject() for x in result])
+ method = obj.portal_catalog
+ result = obj.portal_catalog(portal_type=portal_type, local_roles='Assignee')
+ self.assertSameSet([], [x.getObject() for x in result])
+ result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
+ self.assertSameSet([obj2, ], [x.getObject() for x in result])
+
+ # Check that object is returned when he can view the object
+ obj.manage_permission(perm, ['Auditor'], 0)
+ obj.reindexObject()
+ get_transaction().commit()
+ self.tic()
+ result = obj.portal_catalog(portal_type=portal_type)
+ self.assertSameSet([obj2, obj], [x.getObject() for x in result])
+ result = obj.portal_catalog(portal_type=portal_type, local_roles='Assignee')
+ self.assertSameSet([obj], [x.getObject() for x in result])
+ result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
+ self.assertSameSet([obj2, obj], [x.getObject() for x in result])
+
+ # Check that object is returned when he can view the object
+ obj.manage_permission(perm, ['Assignee'], 0)
+ obj.reindexObject()
+ get_transaction().commit()
+ self.tic()
+ result = obj.portal_catalog(portal_type=portal_type)
+ self.assertSameSet([obj2, obj], [x.getObject() for x in result])
+ result = obj.portal_catalog(portal_type=portal_type, local_roles='Assignee')
+ self.assertSameSet([obj], [x.getObject() for x in result])
+ result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
+ self.assertSameSet([obj2, ], [x.getObject() for x in result])
+ finally:
+ sql_catalog.sql_catalog_object_list = \
+ current_sql_catalog_object_list
+ sql_catalog.sql_clear_catalog = \
+ current_sql_clear_catalog
+ sql_catalog.sql_catalog_local_role_keys = \
+ current_sql_catalog_local_role_keys
+ sql_catalog.sql_catalog_role_keys = \
+ current_sql_catalog_role_keys
+ sql_catalog.sql_search_tables = current_sql_search_tables
+ get_transaction().commit()
+
+ def test_UserOrGroupRoleIndexing(self, quiet=quiet, run=run_all_test):
+ if not run:
+ return
+
+ login = PortalTestCase.login
+ logout = self.logout
+ user1 = 'a_great_user_name'
+ user1_group = 'a_great_user_group'
+ uf = self.getPortal().acl_users
+ uf._doAddUser(user1, user1, ['Member', ], [])
+ uf.zodb_groups.addGroup( user1_group, user1_group, user1_group)
+ new = uf.zodb_groups.addPrincipalToGroup( user1, user1_group )
+
+ perm = 'View'
+ folder = self.getOrganisationModule()
+ folder.manage_setLocalRoles(user1, ['Author', 'Auditor'])
+ portal_type = 'Organisation'
+ organisation = folder.newContent(portal_type=portal_type)
+
+ sql_connection = self.getSQLConnection()
+ def query(sql):
+ result = sql_connection.manage_test(sql)
+ return result.dictionaries()
+
+ login(self, user1)
+
+ # Add a new table to the catalog
+ sql_catalog = self.portal.portal_catalog.getSQLCatalog()
+
+ local_roles_table = "test_user_or_group_local_roles"
+
+ create_local_role_table_sql = """
+CREATE TABLE `%s` (
+ `uid` BIGINT UNSIGNED NOT NULL,
+ `assignee_reference` varchar(32) NOT NULL default '',
+ `viewable_assignee_reference` varchar(32) NOT NULL default '',
+ PRIMARY KEY (`uid`),
+ KEY `assignee_reference` (`assignee_reference`),
+ KEY `viewable_assignee_reference` (`viewable_assignee_reference`)
+) TYPE=InnoDB;
+ """ % local_roles_table
+ sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
+ id = 'z_create_%s' % local_roles_table,
+ title = '',
+ arguments = "",
+ connection_id = 'erp5_sql_connection',
+ template = create_local_role_table_sql)
+
+ drop_local_role_table_sql = """
+DROP TABLE IF EXISTS %s
+ """ % local_roles_table
+ sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
+ id = 'z0_drop_%s' % local_roles_table,
+ title = '',
+ arguments = "",
+ connection_id = 'erp5_sql_connection',
+ template = drop_local_role_table_sql)
+
+ catalog_local_role_sql = """
+REPLACE INTO
+ %s
+VALUES
+<dtml-in prefix="loop" expr="_.range(_.len(uid))">
+(
+ <dtml-sqlvar expr="uid[loop_item]" type="int">,
+ <dtml-sqlvar expr="getAssignee[loop_item] or ''" type="string" optional>,
+ <dtml-sqlvar expr="getViewPermissionAssignee[loop_item] or ''" type="string" optional>
+)
+<dtml-if sequence-end>
+<dtml-else>
+,
+</dtml-if>
+</dtml-in>
+ """ % local_roles_table
+ sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
+ id = 'z_catalog_%s_list' % local_roles_table,
+ title = '',
+ connection_id = 'erp5_sql_connection',
+ arguments = "\n".join(['uid',
+ 'getAssignee',
+ 'getViewPermissionAssignee']),
+ template = catalog_local_role_sql)
+
+ get_transaction().commit()
+ current_sql_catalog_object_list = sql_catalog.sql_catalog_object_list
+ sql_catalog.sql_catalog_object_list = \
+ current_sql_catalog_object_list + \
+ ('z_catalog_%s_list' % local_roles_table,)
+ current_sql_clear_catalog = sql_catalog.sql_clear_catalog
+ sql_catalog.sql_clear_catalog = \
+ current_sql_clear_catalog + \
+ ('z0_drop_%s' % local_roles_table, 'z_create_%s' % local_roles_table)
+ current_sql_catalog_local_role_keys = \
+ sql_catalog.sql_catalog_local_role_keys
+ sql_catalog.sql_catalog_local_role_keys = (
+ 'Owner | viewable_owner',
+ 'Assignee | %s.assignee_reference' % \
+ local_roles_table,)
+
+ current_sql_catalog_role_keys = \
+ sql_catalog.sql_catalog_role_keys
+ sql_catalog.sql_catalog_role_keys = (
+ 'Assignee | %s.viewable_assignee_reference' % \
+ local_roles_table,)
+
+ current_sql_search_tables = sql_catalog.sql_search_tables
+ sql_catalog.sql_search_tables = sql_catalog.sql_search_tables + \
+ [local_roles_table]
+
+ portal = self.getPortal()
+ get_transaction().commit()
+
+ try:
+ # Clear catalog
+ portal_catalog = self.getCatalogTool()
+ portal_catalog.manage_catalogClear()
+ get_transaction().commit()
+ self.portal.portal_caches.clearAllCache()
+ get_transaction().commit()
+
+ organisation_relative_url = organisation.getRelativeUrl()
+ countResults = organisation.portal_catalog.countResults
+ count_result_kw = {'relative_url': organisation_relative_url}
+
+ use_case_number = 0
+ for view_permission_role_list, security_group_list, \
+ global_view, associate_view, assignee_view, both_view in \
+ [
+ # No view permission
+ ([], [], 0, 0, 0, 0),
+ ([], [(user1, ['Associate'])], 0, 0, 0, 0),
+ ([], [(user1, ['Assignee'])], 0, 0, 0, 0),
+ ([], [(user1, ['Assignee', 'Associate'])], 0, 0, 0, 0),
+ ([], [(user1_group, ['Assignee'])], 0, 0, 0, 0),
+ ([], [(user1_group, ['Assignee', 'Associate'])], 0, 0, 0, 0),
+ ([], [(user1, ['Assignee']),
+ (user1_group, ['Assignee'])], 0, 0, 0, 0),
+ ([], [(user1, ['Assignee']),
+ (user1_group, ['Assignee', 'Associate'])], 0, 0, 0, 0),
+
+ # View permission for Assignee
+ (['Assignee'], [], 0, 0, 0, 0),
+ (['Assignee'], [(user1, ['Associate'])], 0, 0, 0, 0),
+ (['Assignee'], [(user1, ['Assignee'])], 1, 0, 1, 1),
+ (['Assignee'], [(user1, ['Assignee', 'Associate'])], 1, 0, 1, 1),
+ (['Assignee'], [(user1_group, ['Assignee'])], 1, 0, 0, 0),
+ (['Assignee'], [(user1_group, ['Assignee', 'Associate'])],
+ 1, 0, 0, 0),
+ (['Assignee'], [(user1, ['Assignee']),
+ (user1_group, ['Assignee'])], 1, 0, 1, 1),
+ (['Assignee'], [(user1, ['Assignee']),
+ (user1_group, ['Assignee', 'Associate'])],
+ 1, 0, 1, 1),
+
+ # View permission for Associate
+ (['Associate'], [], 0, 0, 0, 0),
+ (['Associate'], [(user1, ['Associate'])], 1, 1, 0, 0),
+ (['Associate'], [(user1, ['Assignee'])], 0, 0, 0, 0),
+ (['Associate'], [(user1, ['Assignee', 'Associate'])], 1, 1, 1, 1),
+ (['Associate'], [(user1_group, ['Assignee'])], 0, 0, 0, 0),
+ (['Associate'], [(user1_group, ['Assignee', 'Associate'])],
+ 1, 1, 0, 0),
+ (['Associate'], [(user1, ['Assignee']),
+ (user1_group, ['Assignee'])], 0, 0, 0, 0),
+ (['Associate'], [(user1, ['Assignee']),
+ (user1_group, ['Assignee', 'Associate'])],
+ 1, 1, 1, 1),
+
+ # View permission for Associate and Assignee
+ (['Associate', 'Assignee'], [], 0, 0, 0, 0),
+ (['Associate', 'Assignee'], [(user1, ['Associate'])], 1, 1, 0, 0),
+ (['Associate', 'Assignee'], [(user1, ['Assignee'])], 1, 0, 1, 1),
+ (['Associate', 'Assignee'],
+ [(user1, ['Assignee', 'Associate'])], 1, 1, 1, 1),
+ (['Associate', 'Assignee'],
+ [(user1_group, ['Assignee'])], 1, 0, 0, 0),
+ (['Associate', 'Assignee'],
+ [(user1_group, ['Assignee', 'Associate'])], 1, 1, 0, 0),
+ (['Associate', 'Assignee'], [(user1, ['Assignee']),
+ (user1_group, ['Assignee'])], 1, 0, 1, 1),
+ (['Associate', 'Assignee'], [(user1, ['Assignee']),
+ (user1_group, ['Assignee', 'Associate'])],
+ 1, 1, 1, 1),
+ ]:
+
+ use_case_number += 1
+ organisation.manage_permission(perm, view_permission_role_list, 0)
+ organisation.manage_delLocalRoles([user1, user1_group])
+ for security_group, local_role_list in security_group_list:
+ organisation.manage_setLocalRoles(security_group, local_role_list)
+ organisation.reindexObject()
+ get_transaction().commit()
+ self.tic()
+
+ for expected_result, local_roles in \
+ [
+ (global_view, None),
+ (associate_view, 'Associate'),
+ (assignee_view, 'Assignee'),
+ (both_view, ['Associate', 'Assignee']),
+ ]:
+
+ object_security_uid = query(
+ 'SELECT security_uid FROM catalog WHERE relative_url="%s"' % \
+ organisation_relative_url
+ )[0]['security_uid']
+
+ if object_security_uid is not None:
+ roles_and_users = query(
+ 'SELECT allowedRolesAndUsers FROM roles_and_users WHERE uid="%s"' % \
+ object_security_uid
+ )
+ else:
+ roles_and_users = ''
+
+ monovalue_references = query(
+ 'SELECT * FROM test_user_or_group_local_roles WHERE uid="%s"' % \
+ organisation.getUid())[0]
+ assignee_reference = monovalue_references['assignee_reference']
+ viewable_assignee_reference = \
+ monovalue_references['viewable_assignee_reference']
+
+ result = countResults(local_roles=local_roles, **count_result_kw)[0][0]
+ if result != expected_result:
+ countResults(local_roles=local_roles, src__=1,
+ **count_result_kw)
+ self.fail('Use case %s\n\tView permission is given to: %s\n\t' \
+ 'Local roles are: %s\n\t' \
+ 'local_roles parameter is: %s\n\t' \
+ 'Object IS %s returned by portal_catalog!\n\t' \
+ '\n\tSecurity uid is: %s\n\t'
+ 'Roles and users: %s\n\t'
+ 'assignee_reference: %s\n\t'
+ 'viewable_assignee_reference: %s\n\t'
+ '\n\tSQL generated: \n\n%s' \
+ '' % \
+ (use_case_number,
+ view_permission_role_list,
+ organisation.__ac_local_roles__,
+ local_roles, ['NOT', ''][result],
+ object_security_uid,
+ str([x['allowedRolesAndUsers'] for x in roles_and_users]),
+ assignee_reference,
+ viewable_assignee_reference,
+ countResults(local_roles=local_roles, src__=1,
+ **count_result_kw)))
+
+ finally:
+ sql_catalog.sql_catalog_object_list = \
+ current_sql_catalog_object_list
+ sql_catalog.sql_clear_catalog = \
+ current_sql_clear_catalog
+ sql_catalog.sql_catalog_local_role_keys = \
+ current_sql_catalog_local_role_keys
+ sql_catalog.sql_catalog_role_keys = \
+ current_sql_catalog_role_keys
+ sql_catalog.sql_search_tables = current_sql_search_tables
+ get_transaction().commit()
+
+ def test_UserOrGroupLocalRoleIndexing(self, quiet=quiet, run=run_all_test):
+ if not run:
+ return
+
+ login = PortalTestCase.login
+ logout = self.logout
+ user1 = 'another_great_user_name'
+ user1_group = 'another_great_user_group'
+ uf = self.getPortal().acl_users
+ uf._doAddUser(user1, user1, ['Member', ], [])
+ uf.zodb_groups.addGroup( user1_group, user1_group, user1_group)
+ new = uf.zodb_groups.addPrincipalToGroup( user1, user1_group )
+
+ perm = 'View'
+ folder = self.getOrganisationModule()
+ folder.manage_setLocalRoles(user1, ['Author', 'Auditor'])
+ portal_type = 'Organisation'
+ organisation = folder.newContent(portal_type=portal_type)
+
+ sql_connection = self.getSQLConnection()
+ def query(sql):
+ result = sql_connection.manage_test(sql)
+ return result.dictionaries()
+
+ login(self, user1)
+
+ # Add a new table to the catalog
+ sql_catalog = self.portal.portal_catalog.getSQLCatalog()
+
+ local_roles_table = "another_test_user_or_group_local_roles"
+
+ create_local_role_table_sql = """
+CREATE TABLE `%s` (
+ `uid` BIGINT UNSIGNED NOT NULL,
+ `viewable_assignee_reference` varchar(32) NOT NULL default '',
+ PRIMARY KEY (`uid`),
+ KEY `viewable_assignee_reference` (`viewable_assignee_reference`)
+) TYPE=InnoDB;
+ """ % local_roles_table
+ sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
+ id = 'z_create_%s' % local_roles_table,
+ title = '',
+ arguments = "",
+ connection_id = 'erp5_sql_connection',
+ template = create_local_role_table_sql)
+
+ drop_local_role_table_sql = """
+DROP TABLE IF EXISTS %s
+ """ % local_roles_table
+ sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
+ id = 'z0_drop_%s' % local_roles_table,
+ title = '',
+ arguments = "",
+ connection_id = 'erp5_sql_connection',
+ template = drop_local_role_table_sql)
+
+ catalog_local_role_sql = """
+REPLACE INTO
+ %s
+VALUES
+<dtml-in prefix="loop" expr="_.range(_.len(uid))">
+(
+ <dtml-sqlvar expr="uid[loop_item]" type="int">,
+ <dtml-sqlvar expr="getViewPermissionAssignee[loop_item] or ''" type="string" optional>
+)
+<dtml-if sequence-end>
+<dtml-else>
+,
+</dtml-if>
+</dtml-in>
+ """ % local_roles_table
+ sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
+ id = 'z_catalog_%s_list' % local_roles_table,
+ title = '',
+ connection_id = 'erp5_sql_connection',
+ arguments = "\n".join(['uid',
+ 'getViewPermissionAssignee']),
+ template = catalog_local_role_sql)
+
+ get_transaction().commit()
+ current_sql_catalog_object_list = sql_catalog.sql_catalog_object_list
+ sql_catalog.sql_catalog_object_list = \
+ current_sql_catalog_object_list + \
+ ('z_catalog_%s_list' % local_roles_table,)
+ current_sql_clear_catalog = sql_catalog.sql_clear_catalog
+ sql_catalog.sql_clear_catalog = \
+ current_sql_clear_catalog + \
+ ('z0_drop_%s' % local_roles_table, 'z_create_%s' % local_roles_table)
+
+ current_sql_catalog_role_keys = \
+ sql_catalog.sql_catalog_role_keys
+ sql_catalog.sql_catalog_role_keys = (
+ 'Owner | viewable_owner',
+ 'Assignee | %s.viewable_assignee_reference' % \
+ local_roles_table,)
+
+ current_sql_search_tables = sql_catalog.sql_search_tables
+ sql_catalog.sql_search_tables = sql_catalog.sql_search_tables + \
+ [local_roles_table]
+
+ portal = self.getPortal()
+ get_transaction().commit()
+
+ try:
+ # Clear catalog
+ portal_catalog = self.getCatalogTool()
+ portal_catalog.manage_catalogClear()
+ get_transaction().commit()
+ self.portal.portal_caches.clearAllCache()
+ get_transaction().commit()
+
+ organisation_relative_url = organisation.getRelativeUrl()
+ countResults = organisation.portal_catalog.countResults
+ count_result_kw = {'relative_url': organisation_relative_url}
+
+ use_case_number = 0
+ for view_permission_role_list, security_group_list, \
+ associate_view, assignee_view in \
+ [
+ # No view permission
+ ([], [], 0, 0),
+ ([], [(user1, ['Associate'])], 0, 0),
+ ([], [(user1, ['Assignee'])], 0, 0),
+ ([], [(user1, ['Assignee', 'Associate'])], 0, 0),
+ ([], [(user1_group, ['Assignee'])], 0, 0),
+ ([], [(user1_group, ['Assignee', 'Associate'])], 0, 0),
+ ([], [(user1, ['Assignee']),
+ (user1_group, ['Assignee'])], 0, 0),
+ ([], [(user1, ['Assignee']),
+ (user1_group, ['Assignee', 'Associate'])], 0, 0),
+
+ # View permission for Assignee
+ (['Assignee'], [], 0, 0),
+ (['Assignee'], [(user1, ['Associate'])], 0, 0),
+ (['Assignee'], [(user1, ['Assignee'])], 0, 1),
+ (['Assignee'], [(user1, ['Assignee', 'Associate'])], 0, 1),
+ (['Assignee'], [(user1_group, ['Assignee'])], 0, 1),
+ (['Assignee'], [(user1_group, ['Assignee', 'Associate'])], 0, 1),
+ (['Assignee'], [(user1, ['Assignee']),
+ (user1_group, ['Assignee'])], 0, 1),
+ (['Assignee'], [(user1, ['Assignee']),
+ (user1_group, ['Assignee', 'Associate'])], 0, 1),
+
+ # View permission for Associate
+ (['Associate'], [], 0, 0),
+ (['Associate'], [(user1, ['Associate'])], 1, 0),
+ (['Associate'], [(user1, ['Assignee'])], 0, 0),
+ (['Associate'], [(user1, ['Assignee', 'Associate'])], 1, 0),
+ (['Associate'], [(user1_group, ['Assignee'])], 0, 0),
+ (['Associate'], [(user1_group, ['Assignee', 'Associate'])], 1, 0),
+ (['Associate'], [(user1, ['Assignee']),
+ (user1_group, ['Assignee'])], 0, 0),
+ (['Associate'], [(user1, ['Assignee']),
+ (user1_group, ['Assignee', 'Associate'])], 1, 0),
+
+ # View permission for Associate and Assignee
+ (['Associate', 'Assignee'], [], 0, 0),
+ (['Associate', 'Assignee'], [(user1, ['Associate'])], 1, 0),
+ (['Associate', 'Assignee'], [(user1, ['Assignee'])], 0, 1),
+ (['Associate', 'Assignee'],
+ [(user1, ['Assignee', 'Associate'])], 1, 1),
+ (['Associate', 'Assignee'],
+ [(user1_group, ['Assignee'])], 0, 1),
+ (['Associate', 'Assignee'],
+ [(user1_group, ['Assignee', 'Associate'])], 1, 1),
+ (['Associate', 'Assignee'], [(user1, ['Assignee']),
+ (user1_group, ['Assignee'])], 0, 1),
+ (['Associate', 'Assignee'], [(user1, ['Assignee']),
+ (user1_group, ['Assignee', 'Associate'])], 1, 1),
+ ]:
+
+ use_case_number += 1
+ organisation.manage_permission(perm, view_permission_role_list, 0)
+ organisation.manage_delLocalRoles([user1, user1_group])
+ for security_group, local_role_list in security_group_list:
+ organisation.manage_setLocalRoles(security_group, local_role_list)
+ organisation.reindexObject()
+ get_transaction().commit()
+ self.tic()
+
+ for expected_result, local_roles in \
+ [
+ (associate_view or assignee_view, None),
+ (associate_view, 'Associate'),
+ (assignee_view, 'Assignee'),
+ (associate_view or assignee_view, ['Associate', 'Assignee']),
+ ]:
+
+ object_security_uid = query(
+ 'SELECT security_uid FROM catalog WHERE relative_url="%s"' % \
+ organisation_relative_url
+ )[0]['security_uid']
+
+ if object_security_uid is not None:
+ roles_and_users = query(
+ 'SELECT allowedRolesAndUsers FROM roles_and_users WHERE uid="%s"' % \
+ object_security_uid
+ )
+ else:
+ roles_and_users = ''
+
+ monovalue_references = query(
+ 'SELECT * FROM %s WHERE uid="%s"' % \
+ (local_roles_table, organisation.getUid()))[0]
+ viewable_assignee_reference = \
+ monovalue_references['viewable_assignee_reference']
+
+ result = countResults(local_roles=local_roles, **count_result_kw)[0][0]
+ if result != expected_result:
+ countResults(local_roles=local_roles, src__=1,
+ **count_result_kw)
+ self.fail('Use case %s\n\tView permission is given to: %s\n\t' \
+ 'Local roles are: %s\n\t' \
+ 'local_roles parameter is: %s\n\t' \
+ 'Object IS %s returned by portal_catalog!\n\t' \
+ '\n\tSecurity uid is: %s\n\t'
+ 'Roles and users: %s\n\t'
+ 'viewable_assignee_reference: %s\n\t'
+ '\n\tSQL generated: \n\n%s' \
+ '' % \
+ (use_case_number,
+ view_permission_role_list,
+ organisation.__ac_local_roles__,
+ local_roles, ['NOT', ''][result],
+ object_security_uid,
+ str([x['allowedRolesAndUsers'] for x in roles_and_users]),
+ viewable_assignee_reference,
+ countResults(local_roles=local_roles, src__=1,
+ **count_result_kw)))
+
+ finally:
+ sql_catalog.sql_catalog_object_list = \
+ current_sql_catalog_object_list
+ sql_catalog.sql_clear_catalog = \
+ current_sql_clear_catalog
+ sql_catalog.sql_catalog_role_keys = \
+ current_sql_catalog_role_keys
sql_catalog.sql_search_tables = current_sql_search_tables
get_transaction().commit()
More information about the Erp5-report
mailing list