[Erp5-report] r19414 - in /erp5/trunk/products/ERP5Catalog: ./ tests/
nobody at svn.erp5.org
nobody at svn.erp5.org
Tue Feb 19 18:11:53 CET 2008
Author: romain
Date: Tue Feb 19 18:11:51 2008
New Revision: 19414
URL: http://svn.erp5.org?rev=19414&view=rev
Log:
Instead of generating security query which hardcoded column names, use new
configuration parameter defined on the catalog tool.
Modified:
erp5/trunk/products/ERP5Catalog/CatalogTool.py
erp5/trunk/products/ERP5Catalog/tests/testERP5Catalog.py
Modified: erp5/trunk/products/ERP5Catalog/CatalogTool.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5Catalog/CatalogTool.py?rev=19414&r1=19413&r2=19414&view=diff
==============================================================================
--- erp5/trunk/products/ERP5Catalog/CatalogTool.py (original)
+++ erp5/trunk/products/ERP5Catalog/CatalogTool.py Tue Feb 19 18:11:51 2008
@@ -196,7 +196,6 @@
manage_options = ({ 'label' : 'Overview', 'action' : 'manage_overview' },
) + ZCatalog.manage_options
-
def __init__(self):
ZCatalog.__init__(self, self.getId())
@@ -447,12 +446,16 @@
user_is_superuser = (user_str == SUPER_USER)
allowedRolesAndUsers = self._listAllowedRolesAndUsers(user)
role_column_dict = {}
- column_map = self.getSQLCatalog(sql_catalog_id).getColumnMap()
+ local_role_column_dict = {}
+ catalog = self.getSQLCatalog(sql_catalog_id)
+ column_map = catalog.getColumnMap()
# Patch for ERP5 by JP Smets in order
# to implement worklists and search of local roles
if kw.has_key('local_roles'):
local_roles = kw['local_roles']
+ local_role_dict = dict(catalog.getSQLCatalogLocalRoleKeysList())
+ role_dict = dict(catalog.getSQLCatalogRoleKeysList())
# XXX user is not enough - we should also include groups of the user
# Only consider local_roles if it is not empty
if local_roles not in (None, '', []): # XXX: Maybe "if local_roles:" is enough.
@@ -460,27 +463,42 @@
# Turn it into a list if necessary according to ';' separator
if isinstance(local_roles, str):
local_roles = local_roles.split(';')
- local_roles = [x.lower() for x in local_roles]
# Local roles now has precedence (since it comes from a WorkList)
for user_or_group in allowedRolesAndUsers:
for role in local_roles:
# Performance optimisation
- if role in column_map:
+ if local_role_dict.has_key(role):
+ # XXX This should be a list
# If a given role exists as a column in the catalog,
# then it is considered as single valued and indexed
# through the catalog.
if not user_is_superuser:
- role_column_dict[role] = user_str # XXX This should be a list
- # which also includes all user groups
+ # XXX This should be a list
+ # which also includes all user groups
+ column_id = local_role_dict[role]
+ local_role_column_dict[column_id] = user_str
+ if role_dict.has_key(role):
+ # XXX This should be a list
+ # If a given role exists as a column in the catalog,
+ # then it is considered as single valued and indexed
+ # through the catalog.
+ if not user_is_superuser:
+ # XXX This should be a list
+ # which also includes all user groups
+ column_id = role_dict[role]
+ role_column_dict[column_id] = user_str
else:
# Else, we use the standard approach
new_allowedRolesAndUsers.append('%s:%s' % (user_or_group, role))
- allowedRolesAndUsers = new_allowedRolesAndUsers
+ if local_role_column_dict == {}:
+ allowedRolesAndUsers = new_allowedRolesAndUsers
+
else:
# We only consider here the Owner role (since it was not indexed)
# since some objects may only be visible by their owner
# which was not indexed
- if 'owner' in column_map:
+ for role, column_id in catalog.getSQLCatalogRoleKeysList():
+ # XXX This should be a list
if not user_is_superuser:
try:
# if called by an executable with proxy roles, we don't use
@@ -488,11 +506,11 @@
eo = getSecurityManager()._context.stack[-1]
proxy_roles = getattr(eo, '_proxy_roles', None)
if not proxy_roles:
- role_column_dict['owner'] = user_str
+ role_column_dict[column_id] = user_str
except IndexError:
- role_column_dict['owner'] = user_str
-
- return allowedRolesAndUsers, role_column_dict
+ role_column_dict[column_id] = user_str
+
+ return allowedRolesAndUsers, role_column_dict, local_role_column_dict
def getSecurityUidListAndRoleColumnDict(self, sql_catalog_id=None, **kw):
"""
@@ -503,7 +521,8 @@
site as long as security uids are considered consistent among all
catalogs.
"""
- allowedRolesAndUsers, role_column_dict = self.getAllowedRolesAndUsers(**kw)
+ allowedRolesAndUsers, role_column_dict, local_role_column_dict = \
+ self.getAllowedRolesAndUsers(**kw)
catalog = self.getSQLCatalog(sql_catalog_id)
method = getattr(catalog, catalog.sql_search_security, None)
if allowedRolesAndUsers:
@@ -534,7 +553,7 @@
security_uid_cache[cache_key] = security_uid_list
else:
security_uid_list = []
- return security_uid_list, role_column_dict
+ return security_uid_list, role_column_dict, local_role_column_dict
security.declarePublic('getSecurityQuery')
def getSecurityQuery(self, query=None, sql_catalog_id=None, **kw):
@@ -544,7 +563,9 @@
catalogued with columns.
"""
original_query = query
- security_uid_list, role_column_dict = self.getSecurityUidListAndRoleColumnDict(sql_catalog_id=sql_catalog_id, **kw)
+ security_uid_list, role_column_dict, local_role_column_dict = \
+ self.getSecurityUidListAndRoleColumnDict(
+ sql_catalog_id=sql_catalog_id, **kw)
if role_column_dict:
query_list = []
for key, value in role_column_dict.items():
@@ -560,6 +581,16 @@
query, operator='OR')
else:
query = Query(security_uid=security_uid_list, operator='IN')
+
+ if local_role_column_dict:
+ query_list = []
+ for key, value in local_role_column_dict.items():
+ new_query = Query(**{key : value})
+ query_list.append(new_query)
+ operator_kw = {'operator': 'AND'}
+ local_role_query = ComplexQuery(*query_list, **operator_kw)
+ query = ComplexQuery(query, local_role_query, operator='AND')
+
if original_query is not None:
query = ComplexQuery(query, original_query, operator='AND')
return query
Modified: erp5/trunk/products/ERP5Catalog/tests/testERP5Catalog.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5Catalog/tests/testERP5Catalog.py?rev=19414&r1=19413&r2=19414&view=diff
==============================================================================
--- erp5/trunk/products/ERP5Catalog/tests/testERP5Catalog.py (original)
+++ erp5/trunk/products/ERP5Catalog/tests/testERP5Catalog.py Tue Feb 19 18:11:51 2008
@@ -1896,7 +1896,7 @@
self.assertEquals([], [x.getObject() for x in
obj.searchFolder(portal_type='Bank Account')])
- def test_60_OwnerIndexing(self, quiet=quiet, run=run_all_test):
+ def test_60_ViewableOwnerIndexing(self, quiet=quiet, run=run_all_test):
if not run:
return
@@ -1913,7 +1913,7 @@
sub_portal_type = self.getPortal().portal_types._getOb(sub_portal_type_id)
sql_connection = self.getSQLConnection()
- sql = 'select owner from catalog where uid=%s'
+ sql = 'select viewable_owner as owner from catalog where uid=%s'
login(self, 'super_owner')
@@ -2292,6 +2292,198 @@
result = query('SELECT roles_and_users.uid, roles_and_users.allowedRolesAndUsers FROM roles_and_users, catalog WHERE roles_and_users.uid = catalog.security_uid AND catalog.uid = %i AND roles_and_users.allowedRolesAndUsers LIKE "user:bar%%"' % (object.uid, ))
self.assertEqual(len(result), 0, '%r: len(%r) != 0' % (getObjectDictKey(), result))
+ def test_RealOwnerIndexing(self, quiet=quiet, run=run_all_test):
+ if not run:
+ return
+
+ login = PortalTestCase.login
+ logout = self.logout
+ user1 = 'local_foo'
+ user2 = 'local_bar'
+ uf = self.getPortal().acl_users
+ uf._doAddUser(user1, user1, ['Member', ], [])
+ uf._doAddUser(user2, user2, ['Member', ], [])
+
+ perm = 'View'
+ folder = self.getOrganisationModule()
+ folder.manage_setLocalRoles(user1, ['Author', 'Auditor'])
+ folder.manage_setLocalRoles(user2, ['Author', 'Auditor'])
+ portal_type = 'Organisation'
+
+ sql_connection = self.getSQLConnection()
+
+ login(self, user2)
+ obj2 = folder.newContent(portal_type=portal_type)
+ obj2.manage_setLocalRoles(user1, ['Auditor'])
+ obj2.manage_permission(perm, ['Owner', 'Auditor'], 0)
+
+ login(self, user1)
+
+ obj = folder.newContent(portal_type=portal_type)
+ obj.manage_setLocalRoles(user1, ['Owner', 'Auditor'])
+
+ # Check that nothing is returned when user can not view the object
+ obj.manage_permission(perm, [], 0)
+ obj.reindexObject()
+ get_transaction().commit()
+ self.tic()
+ result = obj.portal_catalog(portal_type=portal_type)
+ self.assertSameSet([obj2, ], [x.getObject() for x in result])
+ result = obj.portal_catalog(portal_type=portal_type, local_roles='Owner')
+ self.assertSameSet([], [x.getObject() for x in result])
+ result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
+ self.assertSameSet([obj2, ], [x.getObject() for x in result])
+
+ # Check that object is returned when he can view the object
+ obj.manage_permission(perm, ['Auditor'], 0)
+ obj.reindexObject()
+ get_transaction().commit()
+ self.tic()
+ result = obj.portal_catalog(portal_type=portal_type)
+ self.assertSameSet([obj2, obj], [x.getObject() for x in result])
+ result = obj.portal_catalog(portal_type=portal_type, local_roles='Owner')
+ self.assertSameSet([], [x.getObject() for x in result])
+ result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
+ self.assertSameSet([obj2, obj], [x.getObject() for x in result])
+
+ # Check that object is returned when he can view the object
+ obj.manage_permission(perm, ['Owner'], 0)
+ obj.reindexObject()
+ get_transaction().commit()
+ self.tic()
+ result = obj.portal_catalog(portal_type=portal_type)
+ self.assertSameSet([obj2, obj], [x.getObject() for x in result])
+ result = obj.portal_catalog(portal_type=portal_type, local_roles='Owner')
+ self.assertSameSet([obj], [x.getObject() for x in result])
+ result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
+ self.assertSameSet([obj2, ], [x.getObject() for x in result])
+
+ # Add a new table to the catalog
+ sql_catalog = self.portal.portal_catalog.getSQLCatalog()
+
+ local_roles_table = "test_local_roles"
+
+ create_local_role_table_sql = """
+CREATE TABLE `%s` (
+ `uid` BIGINT UNSIGNED NOT NULL,
+ `owner_reference` varchar(32) NOT NULL default '',
+ PRIMARY KEY (`uid`),
+ KEY `version` (`owner_reference`)
+) TYPE=InnoDB;
+ """ % local_roles_table
+ sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
+ id = 'z_create_%s' % local_roles_table,
+ title = '',
+ arguments = "",
+ connection_id = 'erp5_sql_connection',
+ template = create_local_role_table_sql)
+
+ drop_local_role_table_sql = """
+DROP TABLE IF EXISTS %s
+ """ % local_roles_table
+ sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
+ id = 'z0_drop_%s' % local_roles_table,
+ title = '',
+ arguments = "",
+ connection_id = 'erp5_sql_connection',
+ template = drop_local_role_table_sql)
+
+ catalog_local_role_sql = """
+REPLACE INTO
+ %s
+VALUES
+<dtml-in prefix="loop" expr="_.range(_.len(uid))">
+(
+ <dtml-sqlvar expr="uid[loop_item]" type="int">,
+ <dtml-sqlvar expr="Base_getOwnerId[loop_item]" type="string" optional>
+)
+<dtml-if sequence-end>
+<dtml-else>
+,
+</dtml-if>
+</dtml-in>
+ """ % local_roles_table
+ sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
+ id = 'z_catalog_%s_list' % local_roles_table,
+ title = '',
+ connection_id = 'erp5_sql_connection',
+ arguments = "\n".join(['uid',
+ 'Base_getOwnerId']),
+ template = catalog_local_role_sql)
+
+ get_transaction().commit()
+ current_sql_catalog_object_list = sql_catalog.sql_catalog_object_list
+ sql_catalog.sql_catalog_object_list = \
+ current_sql_catalog_object_list + \
+ ('z_catalog_%s_list' % local_roles_table,)
+ current_sql_clear_catalog = sql_catalog.sql_clear_catalog
+ sql_catalog.sql_clear_catalog = \
+ current_sql_clear_catalog + \
+ ('z0_drop_%s' % local_roles_table, 'z_create_%s' % local_roles_table)
+ current_sql_catalog_local_role_keys = \
+ sql_catalog.sql_catalog_local_role_keys
+ sql_catalog.sql_catalog_local_role_keys = ('Owner | %s.owner_reference' % \
+ local_roles_table,)
+ current_sql_search_tables = sql_catalog.sql_search_tables
+ sql_catalog.sql_search_tables = sql_catalog.sql_search_tables + \
+ [local_roles_table]
+ get_transaction().commit()
+
+ try:
+ # Clear catalog
+ portal_catalog = self.getCatalogTool()
+ portal_catalog.manage_catalogClear()
+ get_transaction().commit()
+ self.portal.portal_caches.clearAllCache()
+ get_transaction().commit()
+ obj2.reindexObject()
+
+ # Check that nothing is returned when user can not view the object
+ obj.manage_permission(perm, [], 0)
+ obj.reindexObject()
+ get_transaction().commit()
+ self.tic()
+ result = obj.portal_catalog(portal_type=portal_type)
+ self.assertSameSet([obj2, ], [x.getObject() for x in result])
+ method = obj.portal_catalog
+ result = obj.portal_catalog(portal_type=portal_type, local_roles='Owner')
+ self.assertSameSet([], [x.getObject() for x in result])
+ result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
+ self.assertSameSet([obj2, ], [x.getObject() for x in result])
+
+ # Check that object is returned when he can view the object
+ obj.manage_permission(perm, ['Auditor'], 0)
+ obj.reindexObject()
+ get_transaction().commit()
+ self.tic()
+ result = obj.portal_catalog(portal_type=portal_type)
+ self.assertSameSet([obj2, obj], [x.getObject() for x in result])
+ result = obj.portal_catalog(portal_type=portal_type, local_roles='Owner')
+ self.assertSameSet([obj], [x.getObject() for x in result])
+ result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
+ self.assertSameSet([obj2, obj], [x.getObject() for x in result])
+
+ # Check that object is returned when he can view the object
+ obj.manage_permission(perm, ['Owner'], 0)
+ obj.reindexObject()
+ get_transaction().commit()
+ self.tic()
+ result = obj.portal_catalog(portal_type=portal_type)
+ self.assertSameSet([obj2, obj], [x.getObject() for x in result])
+ result = obj.portal_catalog(portal_type=portal_type, local_roles='Owner')
+ self.assertSameSet([obj], [x.getObject() for x in result])
+ result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
+ self.assertSameSet([obj2, ], [x.getObject() for x in result])
+ finally:
+ sql_catalog.sql_catalog_object_list = \
+ current_sql_catalog_object_list
+ sql_catalog.sql_clear_catalog = \
+ current_sql_clear_catalog
+ sql_catalog.sql_catalog_local_role_keys = \
+ current_sql_catalog_local_role_keys
+ sql_catalog.sql_search_tables = current_sql_search_tables
+ get_transaction().commit()
+
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestERP5Catalog))
More information about the Erp5-report
mailing list