[Erp5-report] r34543 daniele - in /erp5/trunk/products/ERP5: ./ Document/ PropertySheet/ To...
nobody at svn.erp5.org
nobody at svn.erp5.org
Wed Apr 14 13:22:49 CEST 2010
Author: daniele
Date: Wed Apr 14 13:22:48 2010
New Revision: 34543
URL: http://svn.erp5.org?rev=34543&view=rev
Log:
Add new API of id tool:
- add the id generators generic
- add the zodb continuous increasing id generator
- add the sql non continuous increasing id generator
- change id tool for the id generators and the compatiblity with the old api
- change the business template to not modify the generator dictionaries
during the export and the install of bt
- change the test of id tool
Added:
erp5/trunk/products/ERP5/Document/IdGenerator.py
erp5/trunk/products/ERP5/Document/SQLNonContinuousIncreasingIdGenerator.py
erp5/trunk/products/ERP5/Document/ZODBContinuousIncreasingIdGenerator.py
erp5/trunk/products/ERP5/PropertySheet/SQLIdGenerator.py
Modified:
erp5/trunk/products/ERP5/Document/BusinessTemplate.py
erp5/trunk/products/ERP5/ERP5Site.py
erp5/trunk/products/ERP5/Tool/IdTool.py
erp5/trunk/products/ERP5/Tool/TemplateTool.py
erp5/trunk/products/ERP5/tests/testIdTool.py
Modified: erp5/trunk/products/ERP5/Document/BusinessTemplate.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5/Document/BusinessTemplate.py?rev=34543&r1=34542&r2=34543&view=diff
==============================================================================
--- erp5/trunk/products/ERP5/Document/BusinessTemplate.py [utf8] (original)
+++ erp5/trunk/products/ERP5/Document/BusinessTemplate.py [utf8] Wed Apr 14 13:22:48 2010
@@ -55,7 +55,7 @@
writeLocalTest, \
removeLocalTest
from Products.ERP5Type.Utils import convertToUpperCase
-from Products.ERP5Type import Permissions, PropertySheet
+from Products.ERP5Type import Permissions, PropertySheet, interfaces
from Products.ERP5Type.XMLObject import XMLObject
from OFS.Traversable import NotFound
from OFS import SimpleItem, XMLExportImport
@@ -591,6 +591,10 @@
obj.deletePdfContent()
elif meta_type == 'Script (Python)':
obj._code = None
+ elif interfaces.IIdGenerator.providedBy(obj):
+ for dict_name in ('last_max_id_dict', 'last_id_dict'):
+ if getattr(obj, dict_name, None) is not None:
+ setattr(obj, dict_name, None)
return obj
def getTemplateTypeName(self):
@@ -1064,6 +1068,14 @@
obj._setProperty(
'business_template_registered_skin_selections',
skin_selection_list, type='tokens')
+ # in case the portal ids, we want keep the property dict
+ elif interfaces.IIdGenerator.providedBy(obj) and \
+ old_obj is not None:
+ for dict_name in ('last_max_id_dict', 'last_id_dict'):
+ # Keep previous last id dict
+ if getattr(old_obj, dict_name, None) is not None:
+ old_dict = getattr(old_obj, dict_name, None)
+ setattr(obj, dict_name, old_dict)
recurse(restoreHook, obj)
# now put original order group
Added: erp5/trunk/products/ERP5/Document/IdGenerator.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5/Document/IdGenerator.py?rev=34543&view=auto
==============================================================================
--- erp5/trunk/products/ERP5/Document/IdGenerator.py (added)
+++ erp5/trunk/products/ERP5/Document/IdGenerator.py [utf8] Wed Apr 14 13:22:48 2010
@@ -1,0 +1,139 @@
+##############################################################################
+#
+# Copyright (c) 2010 Nexedi SARL and Contributors. All Rights Reserved.
+# Daniele Vanbaelinghem <daniele at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsability of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# garantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+##############################################################################
+
+import zope.interface
+from AccessControl import ClassSecurityInfo
+from Products.ERP5Type import Permissions, PropertySheet, Constraint, interfaces
+from Products.ERP5Type.Cache import caching_instance_method
+from Products.ERP5Type.Base import Base
+from Products.CMFCore.utils import getToolByName
+from zLOG import LOG, INFO
+
+class IdGenerator(Base):
+ """
+ Generator of Ids
+ """
+ zope.interface.implements(interfaces.IIdGenerator)
+ # CMF Type Definition
+ meta_type = 'ERP5 Id Generator'
+ portal_type = 'Id Generator'
+ add_permission = Permissions.AddPortalContent
+
+ # Declarative security
+ security = ClassSecurityInfo()
+ security.declareObjectProtected(Permissions.AccessContentsInformation)
+
+ # Declarative property
+ property_sheets = ( PropertySheet.Base,
+ PropertySheet.Version,
+ PropertySheet.Reference)
+
+ security.declareProtected(Permissions.AccessContentsInformation,
+ 'getLatestVersionValue')
+ def getLatestVersionValue(self, **kw):
+ """
+ Return the last generator with the reference
+ """
+ id_tool = getToolByName(self, 'portal_ids', None)
+ last_id = id_tool._getLatestIdGenerator(self.getReference())
+ last_version = id_tool._getOb(last_id)
+ return last_version
+
+ security.declareProtected(Permissions.AccessContentsInformation,
+ 'generateNewId')
+ def generateNewId(self, id_group=None, default=None,):
+ """
+ Generate the next id in the sequence of ids of a particular group
+ Use int to store the last_id, use also a persistant mapping for to be
+ persistent.
+ """
+ try:
+ specialise = self.getSpecialiseValue()
+ except AttributeError:
+ raise AttributeError, 'specialise is not defined'
+ if specialise is None:
+ raise ValueError, "the id generator %s doesn't have specialise value" %\
+ self.getReference()
+ return specialise.getLatestVersionValue().generateNewId(id_group=id_group,
+ default=default)
+
+ security.declareProtected(Permissions.AccessContentsInformation,
+ 'generateNewIdList')
+ def generateNewIdList(self, id_group=None, id_count=1, default=None):
+ """
+ Generate a list of next ids in the sequence of ids of a particular group
+ Store the last id on a database in the portal_ids table
+ If stored in zodb is enable, to store the last id use Length inspired
+ by BTrees.Length to manage conflict in the zodb, use also a persistant
+ mapping to be persistent
+ """
+ # For compatibilty with sql data, must not use id_group as a list
+ if not isinstance(id_group, str):
+ raise AttributeError, 'id_group is not a string'
+ try:
+ specialise = self.getSpecialiseValue()
+ except AttributeError:
+ raise AttributeError, 'specialise is not defined'
+ if specialise is None:
+ raise ValueError, "the id generator %s doesn't have specialise value" %\
+ self.getReference()
+ return specialise.getLatestVersionValue().generateNewIdList(id_group=id_group, \
+ id_count=id_count, default=default)
+
+ security.declareProtected(Permissions.AccessContentsInformation,
+ 'initializeGenerator')
+ def initializeGenerator(self):
+ """
+ Initialize generator. This is mostly used when a new ERP5 site
+ is created. Some generators will need to do some initialization like
+ creating SQL Database, prepare some data in ZODB, etc
+ """
+ specialise = self.getSpecialiseValue()
+ if specialise is None:
+ raise ValueError, "the id generator %s doesn't have specialise value" %\
+ self.getReference()
+ specialise.getLatestVersionValue().initializeGenerator()
+
+ security.declareProtected(Permissions.AccessContentsInformation,
+ 'clearGenerator')
+ def clearGenerator(self):
+ """
+ Clear generators data. This can be usefull when working on a
+ development instance or in some other rare cases. This will
+ loose data and must be use with caution
+
+ This can be incompatible with some particular generator implementation,
+ in this case a particular error will be raised (to be determined and
+ added here)
+ """
+ specialise = self.getSpecialiseValue()
+ if specialise is None:
+ raise ValueError, "the id generator %s doesn't have specialise value" %\
+ self.getReference()
+ specialise.getLatestVersionValue().clearGenerator()
+
Added: erp5/trunk/products/ERP5/Document/SQLNonContinuousIncreasingIdGenerator.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5/Document/SQLNonContinuousIncreasingIdGenerator.py?rev=34543&view=auto
==============================================================================
--- erp5/trunk/products/ERP5/Document/SQLNonContinuousIncreasingIdGenerator.py (added)
+++ erp5/trunk/products/ERP5/Document/SQLNonContinuousIncreasingIdGenerator.py [utf8] Wed Apr 14 13:22:48 2010
@@ -1,0 +1,227 @@
+##############################################################################
+#
+# Copyright (c) 2010 Nexedi SARL and Contributors. All Rights Reserved.
+# Daniele Vanbaelinghem <daniele at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsability of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# garantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+##############################################################################
+
+import zope.interface
+from Acquisition import aq_base
+from AccessControl import ClassSecurityInfo
+from Products.ERP5Type.Globals import PersistentMapping
+from Products.ERP5Type import Permissions, PropertySheet, interfaces
+from Products.ERP5.Document.IdGenerator import IdGenerator
+from _mysql_exceptions import ProgrammingError
+from zLOG import LOG, INFO
+
+import persistent
+
+class LastMaxGeneratedId(persistent.Persistent):
+ """
+ Store the last id generated
+ The object support application-level conflict resolution
+ """
+
+ def __init__(self, value=0):
+ self.value = value
+
+ def __getstate__(self):
+ return self.value
+
+ def __setstate__(self, value):
+ self.value = value
+
+ def set(self, value):
+ self.value = value
+
+ def _p_resolveConflict(self, first_id, second_id):
+ return max(first_id, second_id)
+
+class SQLNonContinuousIncreasingIdGenerator(IdGenerator):
+ """
+ Generate some ids with mysql storage and also zodb is enabled
+ by the checkbox : StoredInZodb
+ """
+ zope.interface.implements(interfaces.IIdGenerator)
+ # CMF Type Definition
+ meta_type = 'ERP5 SQL Non Continous Increasing Id Generator'
+ portal_type = 'SQL Non Continous Increasing Id Generator'
+ add_permission = Permissions.AddPortalContent
+
+ # Declarative security
+ security = ClassSecurityInfo()
+ security.declareObjectProtected(Permissions.AccessContentsInformation)
+
+ # Declarative property
+ property_sheets = (PropertySheet.SQLIdGenerator,
+ ) + IdGenerator.property_sheets
+
+ def _generateNewId(self, id_group, id_count=1, default=None):
+ """
+ Return the next_id with the last_id with the sql method
+ Store the last id on a database in the portal_ids table
+ If stored in zodb is enable, to store the last id use LastMaxGeneratedId inspired
+ by BTrees.Length to manage conflict in the zodb, use also a persistant
+ mapping to be persistent
+ """
+ # Check the arguments
+ if id_group in (None, 'None'):
+ raise ValueError, '%s is not a valid group Id.' % (repr(id_group), )
+ if default is None:
+ default = 0
+
+ # Retrieve the zsql method
+ portal = self.getPortalObject()
+ generate_id_method = getattr(portal, 'IdTool_zGenerateId', None)
+ commit_method = getattr(portal, 'IdTool_zCommit', None)
+ get_last_id_method = getattr(portal, 'IdTool_zGetLastId', None)
+ if None in (generate_id_method, commit_method, get_last_id_method):
+ raise AttributeError, 'Error while generating Id: ' \
+ 'idTool_zGenerateId and/or IdTool_zCommit and/or idTool_zGetLastId' \
+ 'could not be found.'
+ result_query = generate_id_method(id_group=id_group, id_count=id_count, \
+ default=default)
+ try:
+ # Tries of generate the new_id
+ new_id = result_query[0]['LAST_INSERT_ID()']
+ # Commit the changement of new_id
+ commit_method()
+ except ProgrammingError:
+ # If the database not exist, initialise the generator
+ self.initializeGenerator()
+ if self.getStoredInZodb():
+ # Store the new_id on ZODB if the checkbox storedInZodb is enabled
+ self.last_max_id_dict = getattr(aq_base(self), \
+ 'last_max_id_dict', None)
+ if self.last_max_id_dict is None:
+ # If the dictionary not exist, initialize the generator
+ self.initializeGenerator()
+ # Store the new value id
+ if self.last_max_id_dict.get(id_group, None) is None:
+ self.last_max_id_dict[id_group] = LastMaxGeneratedId(new_id)
+ self.last_max_id_dict[id_group].set(new_id)
+ return new_id
+
+ security.declareProtected(Permissions.AccessContentsInformation,
+ 'generateNewId')
+ def generateNewId(self, id_group=None, default=None):
+ """
+ Generate the next id in the sequence of ids of a particular group
+ """
+ new_id = self._generateNewId(id_group=id_group, default=default)
+ return new_id
+
+ security.declareProtected(Permissions.AccessContentsInformation,
+ 'generateNewIdList')
+ def generateNewIdList(self, id_group=None, id_count=1, default=None):
+ """
+ Generate a list of next ids in the sequence of ids of a particular group
+ """
+ new_id = self._generateNewId(id_group=id_group, id_count=id_count, \
+ default=default)
+ return range(new_id - id_count + 1, new_id + 1)
+
+ security.declareProtected(Permissions.AccessContentsInformation,
+ 'initializeGenerator')
+ def initializeGenerator(self):
+ """
+ Initialize generator. This is mostly used when a new ERP5 site
+ is created. Some generators will need to do some initialization like
+ prepare some data in ZODB
+ """
+ LOG('initialize SQL Generator', INFO, 'Id Generator: %s' % (self,))
+ # Check the dictionnary
+ if getattr(self, 'last_max_id_dict', None) is None:
+ self.last_max_id_dict = PersistentMapping()
+ # Create table portal_ids if not exists
+ portal = self.getPortalObject()
+ get_value_list = getattr(portal, 'IdTool_zGetValueList', None)
+ if get_value_list is None:
+ raise AttributeError, 'Error while initialize generator:' \
+ 'idTool_zGetValueList could not be found.'
+ try:
+ get_value_list()
+ except ProgrammingError:
+ drop_method = getattr(portal, 'IdTool_zDropTable', None)
+ create_method = getattr(portal, 'IdTool_zCreateEmptyTable', None)
+ if None in (drop_method, create_method):
+ raise AttributeError, 'Error while initialize generator: ' \
+ 'idTool_zDropTable and/or idTool_zCreateTable could not be found.'
+ drop_method()
+ create_method()
+
+ # XXX compatiblity code below, dump the old dictionnaries
+ # Retrieve the zsql_method
+ portal_ids = getattr(self, 'portal_ids', None)
+ get_last_id_method = getattr(portal, 'IdTool_zGetLastId', None)
+ set_last_id_method = getattr(portal, 'IdTool_zSetLastId', None)
+ if None in (get_last_id_method, set_last_id_method):
+ raise AttributeError, 'Error while generating Id: ' \
+ 'idTool_zGetLastId and/or idTool_zSetLastId could not be found.'
+ storage = self.getStoredInZodb()
+ # Recovery last_max_id_dict datas in zodb if enabled and is in mysql
+ if len(self.last_max_id_dict) != 0:
+ dump_dict = self.last_max_id_dict
+ elif getattr(portal_ids, 'dict_length_ids', None) is not None:
+ dump_dict = portal_ids.dict_length_ids
+ for id_group, last_id in dump_dict.items():
+ last_insert_id = get_last_id_method(id_group=id_group)
+ if len(last_insert_id) != 0:
+ last_insert_id = last_insert_id[0]['LAST_INSERT_ID()']
+ if last_insert_id > last_id.value:
+ # Check value in dict
+ if storage and (not self.last_max_id_dict.has_key(id_group) or \
+ self.last_max_id_dict.has_key[id_group] != last_insert_id):
+ self.last_max_id_dict[id_group] = LastMaxGeneratedId(last_insert_id)
+ self.last_max_id_dict[id_group].set(last_insert_id)
+ continue
+ last_id = int(last_id.value)
+ set_last_id_method(id_group=id_group, last_id=last_id)
+ if storage:
+ self.last_max_id_dict[id_group] = LastMaxGeneratedId(last_id)
+ self.last_max_id_dict[id_group].set(last_id)
+
+ security.declareProtected(Permissions.AccessContentsInformation,
+ 'clearGenerator')
+ def clearGenerator(self):
+ """
+ Clear generators data. This can be usefull when working on a
+ development instance or in some other rare cases. This will
+ loose data and must be use with caution
+
+ This can be incompatible with some particular generator implementation,
+ in this case a particular error will be raised (to be determined and
+ added here)
+ """
+ # Remove dictionary
+ self.last_max_id_dict = PersistentMapping()
+ # Remove and recreate portal_ids table
+ portal = self.getPortalObject()
+ drop_method = getattr(portal, 'IdTool_zDropTable', None)
+ create_method = getattr(portal, 'IdTool_zCreateEmptyTable', None)
+ if None in (drop_method, create_method):
+ raise AttributeError, 'Error while clear generator: ' \
+ 'idTool_zDropTable and/or idTool_zCreateTable could not be found.'
+ drop_method()
+ create_method()
Added: erp5/trunk/products/ERP5/Document/ZODBContinuousIncreasingIdGenerator.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5/Document/ZODBContinuousIncreasingIdGenerator.py?rev=34543&view=auto
==============================================================================
--- erp5/trunk/products/ERP5/Document/ZODBContinuousIncreasingIdGenerator.py (added)
+++ erp5/trunk/products/ERP5/Document/ZODBContinuousIncreasingIdGenerator.py [utf8] Wed Apr 14 13:22:48 2010
@@ -1,0 +1,135 @@
+##############################################################################
+#
+# Copyright (c) 2010 Nexedi SARL and Contributors. All Rights Reserved.
+# Daniele Vanbaelinghem <daniele at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsability of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# garantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+##############################################################################
+
+import zope.interface
+from AccessControl import ClassSecurityInfo
+from Products.ERP5Type.Globals import PersistentMapping
+from Products.ERP5Type import Permissions, interfaces
+from Products.ERP5.Document.IdGenerator import IdGenerator
+
+from zLOG import LOG, INFO
+
+class ZODBContinuousIncreasingIdGenerator(IdGenerator):
+ """
+ Create some Ids with the zodb storage
+ """
+ zope.interface.implements(interfaces.IIdGenerator)
+ # CMF Type Definition
+ meta_type = 'ERP5 ZODB Continous Increasing Id Generator'
+ portal_type = 'ZODB Continous Increasing Id Generator'
+ add_permission = Permissions.AddPortalContent
+
+ # Declarative security
+ security = ClassSecurityInfo()
+ security.declareObjectProtected(Permissions.AccessContentsInformation)
+
+ def _generateNewId(self, id_group, id_count=1, default=None):
+ """
+ Return the new_id from the last_id of the zodb
+ Use int to store the last_id, use also a persistant mapping for to be
+ persistent.
+ """
+ if id_group in (None, 'None'):
+ raise ValueError, '%s is not a valid group Id.' % (repr(id_group), )
+ if default is None:
+ default = 0
+ self.last_id_dict = getattr(self, 'last_id_dict', None)
+ if self.last_id_dict is None:
+ # If the dictionary not exist initialize generator
+ self.initializeGenerator()
+ marker = []
+ # Retrieve the last id
+ last_id = self.last_id_dict.get(id_group, marker)
+ if last_id is marker:
+ new_id = default
+ if id_count > 1:
+ # If create a list use the default and increment
+ new_id = new_id + id_count - 1
+ else:
+ # Increment the last_id
+ new_id = last_id + id_count
+ # Store the new_id in the dictionary
+ self.last_id_dict[id_group] = new_id
+ return new_id
+
+ security.declareProtected(Permissions.AccessContentsInformation,
+ 'generateNewId')
+ def generateNewId(self, id_group=None, default=None):
+ """
+ Generate the next id in the sequence of ids of a particular group
+ """
+ new_id = self._generateNewId(id_group=id_group, default=default)
+ return new_id
+
+ security.declareProtected(Permissions.AccessContentsInformation,
+ 'generateNewIdList')
+ def generateNewIdList(self, id_group=None, id_count=1, default=None):
+ """
+ Generate a list of next ids in the sequence of ids of a particular group
+ """
+ new_id = self._generateNewId(id_group=id_group, id_count=id_count, \
+ default=default)
+ return range(new_id - id_count + 1, new_id + 1)
+
+ security.declareProtected(Permissions.AccessContentsInformation,
+ 'initializeGenerator')
+ def initializeGenerator(self):
+ """
+ Initialize generator. This is mostly used when a new ERP5 site
+ is created. Some generators will need to do some initialization like
+ prepare some data in ZODB
+ """
+ LOG('initialize ZODB Generator', INFO, 'Id Generator: %s' % (self,))
+ if getattr(self, 'last_id_dict', None) is None:
+ self.last_id_dict = PersistentMapping()
+
+ # XXX compatiblity code below, dump the old dictionnaries
+ portal_ids = getattr(self, 'portal_ids', None)
+ # Dump the dict_ids dictionary
+ if getattr(portal_ids, 'dict_ids', None) is not None:
+ for id_group, last_id in portal_ids.dict_ids.items():
+ if self.last_id_dict.has_key(id_group) and \
+ self.last_id_dict[id_group] > last_id:
+ continue
+ self.last_id_dict[id_group] = last_id
+
+ security.declareProtected(Permissions.AccessContentsInformation,
+ 'clearGenerator')
+ def clearGenerator(self):
+ """
+ Clear generators data. This can be usefull when working on a
+ development instance or in some other rare cases. This will
+ loose data and must be use with caution
+
+ This can be incompatible with some particular generator implementation,
+ in this case a particular error will be raised (to be determined and
+ added here)
+ """
+ # Remove dictionary
+ self.last_id_dict = PersistentMapping()
+
Modified: erp5/trunk/products/ERP5/ERP5Site.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5/ERP5Site.py?rev=34543&r1=34542&r2=34543&view=diff
==============================================================================
--- erp5/trunk/products/ERP5/ERP5Site.py [utf8] (original)
+++ erp5/trunk/products/ERP5/ERP5Site.py [utf8] Wed Apr 14 13:22:48 2010
@@ -1848,15 +1848,11 @@
# we don't want to make it crash
if p.erp5_sql_connection_type is not None:
setattr(p, 'isIndexable', ConstantGetter('isIndexable', value=True))
- portal_catalog = p.portal_catalog
# Clear portal ids sql table, like this we do not take
# ids for a previously created web site
- # XXX It's temporary, a New API will be implemented soon
- # the code will be change
- p.IdTool_zDropTable()
- p.IdTool_zCreateTable()
+ p.portal_ids.initializeGenerator(all=True)
# Then clear the catalog and reindex it
- portal_catalog.manage_catalogClear()
+ p.portal_catalog.manage_catalogClear()
# Calling ERP5Site_reindexAll is useless.
def setupUserFolder(self, p):
Added: erp5/trunk/products/ERP5/PropertySheet/SQLIdGenerator.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5/PropertySheet/SQLIdGenerator.py?rev=34543&view=auto
==============================================================================
--- erp5/trunk/products/ERP5/PropertySheet/SQLIdGenerator.py (added)
+++ erp5/trunk/products/ERP5/PropertySheet/SQLIdGenerator.py [utf8] Wed Apr 14 13:22:48 2010
@@ -1,0 +1,86 @@
+# -*- coding: utf-8 -*-
+##############################################################################
+#
+# Copyright (c) 2010 Nexedi SA and Contributors. All Rights Reserved.
+# Daniele Vanbaelinghem <daniele at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsibility of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# guarantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+##############################################################################
+
+from Products.CMFCore.Expression import Expression
+
+class SQLIdGenerator:
+ """
+ Id Generator provides properties to check the storage on ZODB
+ """
+
+ _properties = (
+ {'id' :'stored_in_zodb',
+ 'label' : 'A flag indicating if the last_id is stored in the ZODB',
+ 'type' :'boolean',
+ 'mode' :'w'
+ },
+ )
+# -*- coding: utf-8 -*-
+##############################################################################
+#
+# Copyright (c) 2010 Nexedi SA and Contributors. All Rights Reserved.
+# Daniele Vanbaelinghem <daniele at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsibility of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# guarantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+##############################################################################
+
+from Products.CMFCore.Expression import Expression
+
+class SQLIdGenerator:
+ """
+ Id Generator provides properties to check the storage on ZODB
+ """
+
+ _properties = (
+ {'id' :'stored_in_zodb',
+ 'label' : 'A flag indicating if the last_id is stored in the ZODB',
+ 'type' :'boolean',
+ 'mode' :'w'
+ },
+ )
Modified: erp5/trunk/products/ERP5/Tool/IdTool.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5/Tool/IdTool.py?rev=34543&r1=34542&r2=34543&view=diff
==============================================================================
--- erp5/trunk/products/ERP5/Tool/IdTool.py [utf8] (original)
+++ erp5/trunk/products/ERP5/Tool/IdTool.py [utf8] Wed Apr 14 13:22:48 2010
@@ -26,21 +26,27 @@
#
##############################################################################
+import warnings
+import zope.interface
+
from Acquisition import aq_base
from AccessControl import ClassSecurityInfo
from Products.ERP5Type.Globals import InitializeClass, DTMLFile, PersistentMapping
from Products.ERP5Type.Tool.BaseTool import BaseTool
-from Products.ERP5Type import Permissions
-from Products.CMFCore.utils import getToolByName
-from zLOG import LOG, WARNING
+from Products.ERP5Type.Cache import caching_instance_method
+from Products.ERP5Type import Permissions, interfaces
+from zLOG import LOG, WARNING, INFO, ERROR
from Products.ERP5 import _dtmldir
from BTrees.Length import Length
+
+_marker = object()
class IdTool(BaseTool):
"""
This tools handles the generation of IDs.
"""
+ zope.interface.implements(interfaces.IIdTool)
id = 'portal_ids'
meta_type = 'ERP5 Id Tool'
portal_type = 'Id Tool'
@@ -51,19 +57,291 @@
security.declareProtected( Permissions.ManagePortal, 'manage_overview' )
manage_overview = DTMLFile( 'explainIdTool', _dtmldir )
+ def newContent(self, *args, **kw):
+ """
+ the newContent is overriden to not use generateNewId
+ """
+ if not kw.has_key(id):
+ new_id = self._generateNextId()
+ if new_id is not None:
+ kw['id'] = new_id
+ else:
+ raise ValueError('Failed to gererate id')
+ return BaseTool.newContent(self, *args, **kw)
+
+ def _get_id(self, id):
+ """
+ _get_id is overrided to not use generateNewId
+ It is used for example when an object is cloned
+ """
+ if self._getOb(id, None) is None :
+ return id
+ return self._generateNextId()
+
+ @caching_instance_method(id='IdTool._getLatestIdGenerator',
+ cache_factory='erp5_content_long')
+ def _getLatestIdGenerator(self, reference):
+ """
+ Tries to find the id_generator with the latest version
+ from the current object.
+ Use the low-level to create a site without catalog
+ """
+ assert reference
+ id_tool = self.getPortalObject().portal_ids
+ id_last_generator = None
+ version_last_generator = 0
+ for generator in id_tool.objectValues():
+ if generator.getReference() == reference:
+ version = generator.getVersion()
+ if version > version_last_generator:
+ id_last_generator = generator.getId()
+ version_last_generator = generator.getVersion()
+ if id_last_generator is None:
+ raise KeyError, 'The generator %s is not present' % (reference,)
+ return id_last_generator
+
+ security.declareProtected(Permissions.AccessContentsInformation,
+ 'getLatestVersionValue')
+ def _getLatestGeneratorValue(self, id_generator):
+ """
+ Return the last generator with the reference
+ """
+ id_tool = self.getPortalObject().portal_ids
+ last_id_generator = self._getLatestIdGenerator(id_generator)
+ last_generator = id_tool._getOb(last_id_generator)
+ return last_generator
+
+ security.declareProtected(Permissions.AccessContentsInformation,
+ 'generateNewId')
+ def generateNewId(self, id_group=None, default=None, method=_marker,
+ id_generator=None):
+ """
+ Generate the next id in the sequence of ids of a particular group
+ """
+ if id_group in (None, 'None'):
+ raise ValueError, '%s is not a valid group Id.' % (repr(id_group), )
+ # for compatibilty with sql data, must not use id_group as a list
+ if not isinstance(id_group, str):
+ id_group = repr(id_group)
+ warnings.warn('The id_group must be a string the other types '
+ 'is deprecated.', DeprecationWarning)
+ if id_generator is None:
+ id_generator = 'document'
+ if method is not _marker:
+ warnings.warn("The usage of 'method' argument is deprecated.\n"
+ "use this method with a id generator without this"
+ "argument", DeprecationWarning)
+ try:
+ #use _getLatestGeneratorValue here for that the technical level
+ #must not call the method
+ last_generator = self._getLatestGeneratorValue(id_generator)
+ new_id = last_generator.generateNewId(id_group=id_group, \
+ default=default)
+ except KeyError:
+ template_tool = getattr(self, 'portal_templates', None)
+ revision = template_tool.getInstalledBusinessTemplateRevision('erp5_core')
+ # XXX backward compatiblity
+ if revision > '1561':
+ LOG('generateNewId', ERROR, 'while generating id')
+ raise
+ else:
+ # Compatibility code below, in case the last version of erp5_core
+ # is not installed yet
+ warnings.warn("You use the old version of API which is deprecated.\n"
+ "please, update the business template erp5_core "
+ "to use the new API", DeprecationWarning)
+ dict_ids = getattr(aq_base(self), 'dict_ids', None)
+ if dict_ids is None:
+ dict_ids = self.dict_ids = PersistentMapping()
+ new_id = None
+ # Getting the last id
+ if default is None:
+ default = 0
+ marker = []
+ new_id = dict_ids.get(id_group, marker)
+ if method is _marker:
+ if new_id is marker:
+ new_id = default
+ else:
+ new_id = new_id + 1
+ else:
+ if new_id is marker:
+ new_id = default
+ new_id = method(new_id)
+ # Store the new value
+ dict_ids[id_group] = new_id
+ return new_id
+
+ security.declareProtected(Permissions.AccessContentsInformation,
+ 'generateNewIdList')
+ def generateNewIdList(self, id_group=None, id_count=1, default=None,
+ store=_marker, id_generator=None):
+ """
+ Generate a list of next ids in the sequence of ids of a particular group
+ """
+ if id_group in (None, 'None'):
+ raise ValueError, '%s is not a valid group Id.' % (repr(id_group), )
+ # for compatibilty with sql data, must not use id_group as a list
+ if not isinstance(id_group, str):
+ id_group = repr(id_group)
+ warnings.warn('The id_group must be a string the other types '
+ 'is deprecated.', DeprecationWarning)
+ if id_generator is None:
+ id_generator = 'uid'
+ if store is not _marker:
+ warnings.warn("The usage of 'store' argument is deprecated.\n"
+ "use this method with a id generator and without this."
+ "argument", DeprecationWarning)
+ try:
+ #use _getLatestGeneratorValue here for that the technical level
+ #must not call the method
+ last_generator = self._getLatestGeneratorValue(id_generator)
+ new_id_list = last_generator.generateNewIdList(id_group=id_group,
+ id_count=id_count, default=default)
+ except KeyError:
+ template_tool = getattr(self, 'portal_templates', None)
+ revision = template_tool.getInstalledBusinessTemplateRevision('erp5_core')
+ # XXX backward compatiblity
+ if revision > '1561':
+ LOG('generateNewIdList', ERROR, 'while generating id')
+ raise
+ else:
+ # Compatibility code below, in case the last version of erp5_core
+ # is not installed yet
+ warnings.warn("You use the old version of erp5_core to generate the ids.\n"
+ "please, update the business template erp5_core "
+ "to have the new id generators", DeprecationWarning)
+ new_id = None
+ if default is None:
+ default = 1
+ # XXX It's temporary, a New API will be implemented soon
+ # the code will be change
+ portal = self.getPortalObject()
+ query = getattr(portal, 'IdTool_zGenerateId', None)
+ commit = getattr(portal, 'IdTool_zCommit', None)
+
+ if query is None or commit is None:
+ portal_catalog = getattr(self, 'portal_catalog').getSQLCatalog()
+ query = getattr(portal_catalog, 'z_portal_ids_generate_id')
+ commit = getattr(portal_catalog, 'z_portal_ids_commit')
+ if None in (query, commit):
+ raise AttributeError, 'Error while generating Id: ' \
+ 'idTool_zGenerateId and/or idTool_zCommit could not ' \
+ 'be found.'
+ try:
+ result = query(id_group=id_group, id_count=id_count, default=default)
+ finally:
+ commit()
+ new_id = result[0]['LAST_INSERT_ID()']
+ if store:
+ if getattr(aq_base(self), 'dict_length_ids', None) is None:
+ # Length objects are stored in a persistent mapping: there is one
+ # Length object per id_group.
+ self.dict_length_ids = PersistentMapping()
+ if self.dict_length_ids.get(id_group) is None:
+ self.dict_length_ids[id_group] = Length(new_id)
+ self.dict_length_ids[id_group].set(new_id)
+ new_id_list = range(new_id - id_count, new_id)
+ return new_id_list
+
+ security.declareProtected(Permissions.AccessContentsInformation,
+ 'initializeGenerator')
+ def initializeGenerator(self, id_generator=None, all=False):
+ """
+ Initialize generators. This is mostly used when a new ERP5 site
+ is created. Some generators will need to do some initialization like
+ creating SQL Database, prepare some data in ZODB, etc
+ """
+ if not all:
+ #Use _getLatestGeneratorValue here for that the technical level
+ #must not call the method
+ last_generator = self._getLatestGeneratorValue(id_generator)
+ last_generator.initializeGenerator()
+ else:
+ # recovery all the generators and initialize them
+ for generator in self.objectValues(\
+ portal_type='Application Id Generator'):
+ generator.initializeGenerator()
+
+ security.declareProtected(Permissions.AccessContentsInformation,
+ 'clearGenerator')
+ def clearGenerator(self, id_generator=None, all=False):
+ """
+ Clear generators data. This can be usefull when working on a
+ development instance or in some other rare cases. This will
+ loose data and must be use with caution
+
+ This can be incompatible with some particular generator implementation,
+ in this case a particular error will be raised (to be determined and
+ added here)
+ """
+ if not all:
+ #Use _getLatestGeneratorValue here for that the technical level
+ #must not call the method
+ last_generator = self._getLatestGeneratorValue(id_generator)
+ last_generator.clearGenerator()
+
+ else:
+ if len(self.objectValues()) == 0:
+ # compatibility with old API
+ self.getPortalObject().IdTool_zDropTable()
+ self.getPortalObject().IdTool_zCreateTable()
+ for generator in self.objectValues(\
+ portal_type='Application Id Generator'):
+ generator.clearGenerator()
+
+ ## XXX Old API deprecated
+ #backward compatibility
+ generateNewLengthIdList = generateNewIdList
+
+ #use by alarm
+ security.declareProtected(Permissions.AccessContentsInformation,
+ 'getLastLengthGeneratedId')
+ def getLastLengthGeneratedId(self, id_group, default=None):
+ """
+ Get the last length id generated
+ """
+ warnings.warn('The usage of this method is deprecated.\n'
+ , DeprecationWarning)
+ # check in persistent mapping if exists
+ if getattr(aq_base(self), 'dict_length_ids', None) is not None:
+ last_id = self.dict_length_ids.get(id_group)
+ if last_id is not None:
+ return last_id.value - 1
+ # otherwise check in mysql
+ # XXX It's temporary, a New API will be implemented soon
+ # the code will be change
+ portal = self.getPortalObject()
+ query = getattr(portal, 'IdTool_zGetLastId', None)
+ if query is None:
+ portal_catalog = getattr(self, 'portal_catalog').getSQLCatalog()
+ query = getattr(portal_catalog, 'z_portal_ids_get_last_id')
+ if query is None:
+ raise AttributeError, 'Error while getting last Id: ' \
+ 'IdTool_zGetLastId could not ' \
+ 'be found.'
+ result = query(id_group=id_group)
+ if len(result):
+ return result[0]['last_id'] - 1
+ return default
+
+ #use in erp5_accounting
security.declareProtected(Permissions.AccessContentsInformation,
'getLastGeneratedId')
def getLastGeneratedId(self, id_group=None, default=None):
"""
Get the last id generated
"""
+ warnings.warn('The usage of this method is deprecated.\n'
+ , DeprecationWarning)
if getattr(aq_base(self), 'dict_ids', None) is None:
self.dict_ids = PersistentMapping()
last_id = None
if id_group is not None and id_group != 'None':
last_id = self.dict_ids.get(id_group, default)
return last_id
-
+
+ #use in the unit tests
security.declareProtected(Permissions.ModifyPortalContent,
'setLastGeneratedId')
def setLastGeneratedId(self, new_id, id_group=None):
@@ -75,37 +353,21 @@
self.dict_ids = PersistentMapping()
if id_group is not None and id_group != 'None':
self.dict_ids[id_group] = new_id
-
- security.declareProtected(Permissions.AccessContentsInformation,
- 'generateNewId')
- def generateNewId(self, id_group=None, default=None, method=None):
- """
- Generate a new Id
- """
-
- dict_ids = getattr(aq_base(self), 'dict_ids', None)
- if dict_ids is None:
- dict_ids = self.dict_ids = PersistentMapping()
-
- new_id = None
- if id_group is not None and id_group != 'None':
- # Getting the last id
- if default is None:
- default = 0
- marker = []
- new_id = dict_ids.get(id_group, marker)
- if method is None:
- if new_id is marker:
- new_id = default
- else:
- new_id = new_id + 1
- else:
- if new_id is marker:
- new_id = default
- new_id = method(new_id)
- # Store the new value
- dict_ids[id_group] = new_id
- return new_id
+
+ # use several files
+ security.declareProtected(Permissions.AccessContentsInformation,
+ 'generateNewLengthId')
+ def generateNewLengthId(self, id_group=None, default=None, store=1):
+ """
+ Generates an Id.
+ See generateNewLengthIdList documentation for details.
+ """
+ warnings.warn('The usage of this method is deprecated.\n'
+ 'use directly generateNewIdList'
+ ' with a id_generator sql', DeprecationWarning)
+ new_id = self.generateNewIdList(id_group=id_group,
+ id_count=1, default=default, store=store)[0]
+ return new_id
security.declareProtected(Permissions.AccessContentsInformation,
'getDictLengthIdsItems')
@@ -124,13 +386,8 @@
"""
Store persistently data from SQL table portal_ids.
"""
- # XXX It's temporary, a New API will be implemented soon
- # the code will be change
- portal = self.getPortalObject()
- query = getattr(portal, 'IdTool_zDump', None)
- if query is None:
- portal_catalog = getToolByName(self, 'portal_catalog').getSQLCatalog()
- query = getattr(portal_catalog, 'z_portal_ids_dump')
+ portal_catalog = getattr(self, 'portal_catalog').getSQLCatalog()
+ query = getattr(portal_catalog, 'z_portal_ids_dump')
dict_length_ids = getattr(aq_base(self), 'dict_length_ids', None)
if dict_length_ids is None:
dict_length_ids = self.dict_length_ids = PersistentMapping()
@@ -150,97 +407,4 @@
'than SQL value (%r). Keeping ZODB value untouched.' % \
(stored_last_id, id_group, last_id))
- security.declareProtected(Permissions.AccessContentsInformation,
- 'getLastLengthGeneratedId')
- def getLastLengthGeneratedId(self, id_group, default=None):
- """
- Get the last length id generated
- """
- # check in persistent mapping if exists
- if getattr(aq_base(self), 'dict_length_ids', None) is not None:
- last_id = self.dict_length_ids.get(id_group)
- if last_id is not None:
- return last_id.value - 1
- # otherwise check in mysql
- # XXX It's temporary, a New API will be implemented soon
- # the code will be change
- portal = self.getPortalObject()
- query = getattr(portal, 'IdTool_zGetLastId', None)
- if query is None:
- portal_catalog = getToolByName(self, 'portal_catalog').getSQLCatalog()
- query = getattr(portal_catalog, 'z_portal_ids_get_last_id')
- if query is None:
- raise AttributeError, 'Error while getting last Id: ' \
- 'IdTool_zGetLastId could not ' \
- 'be found.'
- result = query(id_group=id_group)
- if len(result):
- return result[0]['last_id'] - 1
- return default
-
- security.declareProtected(Permissions.AccessContentsInformation,
- 'generateNewLengthIdList')
- def generateNewLengthIdList(self, id_group=None, id_count=1, default=None,
- store=True):
- """
- Generates a list of Ids.
- The ids are generated using mysql and then stored in a Length object in a
- persistant mapping to be persistent.
- We use MySQL to generate IDs, because it is atomic and we don't want
- to generate any conflict at zope level. The possible downfall is that
- some IDs might be skipped because of failed transactions.
- "Length" is because the id is stored in a python object inspired by
- BTrees.Length. It doesn't have to be a length.
-
- store : if we want to store the new id into the zodb, we want it
- by default
- """
- new_id = None
- if id_group in (None, 'None'):
- raise ValueError, '%s is not a valid group Id.' % (repr(id_group), )
- if not isinstance(id_group, str):
- id_group = repr(id_group)
- if default is None:
- default = 1
- # FIXME: A skin folder should be used to contain ZSQLMethods instead of
- # default catalog, like activity tool (anyway, it uses activity tool
- # ZSQLConnection, so hot reindexing is not helping here).
- # XXX It's temporary, a New API will be implemented soon
- # the code will be change
- portal = self.getPortalObject()
- query = getattr(portal, 'IdTool_zGenerateId', None)
- commit = getattr(portal, 'IdTool_zCommit', None)
- if query is None or commit is None:
- portal_catalog = getToolByName(self, 'portal_catalog').getSQLCatalog()
- query = getattr(portal_catalog, 'z_portal_ids_generate_id')
- commit = getattr(portal_catalog, 'z_portal_ids_commit')
- if None in (query, commit):
- raise AttributeError, 'Error while generating Id: ' \
- 'idTool_zGenerateId and/or idTool_zCommit could not ' \
- 'be found.'
- try:
- result = query(id_group=id_group, id_count=id_count, default=default)
- finally:
- commit()
- new_id = result[0]['LAST_INSERT_ID()']
- if store:
- if getattr(aq_base(self), 'dict_length_ids', None) is None:
- # Length objects are stored in a persistent mapping: there is one
- # Length object per id_group.
- self.dict_length_ids = PersistentMapping()
- if self.dict_length_ids.get(id_group) is None:
- self.dict_length_ids[id_group] = Length(new_id)
- self.dict_length_ids[id_group].set(new_id)
- return range(new_id - id_count, new_id)
-
- security.declareProtected(Permissions.AccessContentsInformation,
- 'generateNewLengthId')
- def generateNewLengthId(self, id_group=None, default=None, store=1):
- """
- Generates an Id.
- See generateNewLengthIdList documentation for details.
- """
- return self.generateNewLengthIdList(id_group=id_group, id_count=1,
- default=default, store=store)[0]
-
InitializeClass(IdTool)
Modified: erp5/trunk/products/ERP5/Tool/TemplateTool.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5/Tool/TemplateTool.py?rev=34543&r1=34542&r2=34543&view=diff
==============================================================================
--- erp5/trunk/products/ERP5/Tool/TemplateTool.py [utf8] (original)
+++ erp5/trunk/products/ERP5/Tool/TemplateTool.py [utf8] Wed Apr 14 13:22:48 2010
@@ -128,6 +128,14 @@
if bt.getInstallationState() == 'installed':
installed_bts.append(bt)
return installed_bts
+
+ def getInstalledBusinessTemplateRevision(self, title, **kw):
+ """
+ Return the revision of business template installed with the title
+ given
+ """
+ bt = self.getInstalledBusinessTemplate(title)
+ return bt.getRevision()
def getBuiltBusinessTemplatesList(self):
"""Deprecated.
Modified: erp5/trunk/products/ERP5/tests/testIdTool.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5/tests/testIdTool.py?rev=34543&r1=34542&r2=34543&view=diff
==============================================================================
--- erp5/trunk/products/ERP5/tests/testIdTool.py [utf8] (original)
+++ erp5/trunk/products/ERP5/tests/testIdTool.py [utf8] Wed Apr 14 13:22:48 2010
@@ -3,6 +3,7 @@
#
# Copyright (c) 2008 Nexedi SARL and Contributors. All Rights Reserved.
# Aurélien Calonne <aurel at nexedi.com>
+# Danièle Vanbaelinghem <daniele at nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
@@ -35,99 +36,228 @@
class TestIdTool(ERP5TypeTestCase):
# Different variables used for this test
- run_all_test = 1
- resource_type='Apparel Component'
- resource_variation_type='Apparel Component Variation'
- resource_module = 'apparel_component_module'
-
- def getTitle(self):
- """
- """
- return "Id Tool"
-
- def getBusinessTemplateList(self):
- """
- Return the list of business templates.
-
- """
- return ()
-
- def test_getLastLengthGeneratedId(self, quiet=0, run=run_all_test):
- if not run: return
- if not quiet:
- self.logMessage('Check getLastLengthGeneratedId method')
- idtool = self.portal.portal_ids
- # test with value stored into zodb
- new_id = idtool.generateNewLengthId(id_group=4, store=1)
+ def afterSetUp(self):
+ self.login()
+ self.portal = self.getPortal()
+ self.id_tool = self.portal.portal_ids
+ self.createGenerators()
transaction.commit()
self.tic()
- last_id = idtool.getLastLengthGeneratedId(id_group=4)
- self.assertEqual(new_id, last_id)
- # same test without storing value into zodb
- new_id = idtool.generateNewLengthId(id_group=5, store=0)
+
+ def beforeTearDown(self):
+ self.id_tool.clearGenerator(all=True)
+ self.id_tool.initializeGenerator(all=True)
+
+ def getTitle(self):
+ """
+ Return the title of test
+ """
+ return "Test Id Tool"
+
+ def createGenerators(self):
+ """
+ Initialize some generators for the tests
+ """
+ self.application_sql_generator = self.id_tool.newContent(\
+ portal_type='Application Id Generator',
+ reference='test_application_sql',
+ version='001')
+ self.conceptual_sql_generator = self.id_tool.newContent(\
+ portal_type='Conceptual Id Generator',
+ reference='test_non_continuous_increasing',
+ version='001')
+ self.sql_generator = self.id_tool.newContent(\
+ portal_type='SQL Non Continuous Increasing Id Generator',
+ reference='test_sql_non_continuous_increasing',
+ version='001')
+ self.application_sql_generator.setSpecialiseValue(\
+ self.conceptual_sql_generator)
+ self.conceptual_sql_generator.setSpecialiseValue(self.sql_generator)
+
+ self.application_zodb_generator = self.id_tool.newContent(\
+ portal_type='Application Id Generator',
+ reference='test_application_zodb',
+ version='001')
+ self.conceptual_zodb_generator = self.id_tool.newContent(\
+ portal_type='Conceptual Id Generator',
+ reference='test_continuous_increasing',
+ version='001')
+ self.zodb_generator = self.id_tool.newContent(\
+ portal_type='ZODB Continuous Increasing Id Generator',
+ reference='test_zodb_continuous_increasing',
+ version='001')
+ self.application_zodb_generator.setSpecialiseValue(\
+ self.conceptual_zodb_generator)
+ self.conceptual_zodb_generator.setSpecialiseValue(self.zodb_generator)
+
+ def getLastGenerator(self, id_generator):
+ """
+ Return Last Id Generator
+ """
+ document_generator = self.id_tool.searchFolder(reference=id_generator)[0]
+ application_generator = document_generator.getLatestVersionValue()
+ conceptual_generator = application_generator.getSpecialiseValue()\
+ .getLatestVersionValue()
+ last_generator = conceptual_generator.getSpecialiseValue()\
+ .getLatestVersionValue()
+ return last_generator
+
+ def test_01a_checkVersionGenerator(self):
+ """
+ Add technical generator with a higher version
+ test if the id_tool use the last version of a generator with the same
+ reference
+ """
+ conceptual_sql_generator = self.application_sql_generator.getSpecialiseValue().\
+ getLatestVersionValue()
+ last_sql = self.conceptual_sql_generator.getSpecialiseValue().\
+ getLatestVersionValue()
+ self.assertEquals(last_sql.getVersion(), '001')
+ # Create new id generator with a more higher version
+ sql_generator_2 = self.id_tool.newContent(\
+ portal_type='SQL Non Continuous Increasing Id Generator',
+ reference='test_sql_non_continuous_increasing',
+ version='002')
+ conceptual_sql_generator.setSpecialiseValue(sql_generator_2)
transaction.commit()
self.tic()
- last_id = idtool.getLastLengthGeneratedId(id_group=5)
- self.assertEqual(new_id, last_id)
- # test with id_group as tuple
- new_id = idtool.generateNewLengthId(id_group=(6,), store=0)
- transaction.commit()
- self.tic()
- last_id = idtool.getLastLengthGeneratedId(id_group=(6,),)
- self.assertEqual(new_id, last_id)
- # test default value
- last_id = idtool.getLastLengthGeneratedId(id_group=(7,),)
- self.assertEqual(None, last_id)
- last_id = idtool.getLastLengthGeneratedId(id_group=8,default=99)
- self.assertEqual(99, last_id)
- # test the store parameter with an existing value stored in the ZODB
- new_id = idtool.generateNewLengthId(id_group=(9,), store=1)
- transaction.commit()
- self.tic()
- last_id = idtool.getLastLengthGeneratedId(id_group=(9,),)
- self.assertEqual(new_id, last_id)
- new_id = idtool.generateNewLengthId(id_group=(9,), store=0)
- transaction.commit()
- self.tic()
- last_id = idtool.getLastLengthGeneratedId(id_group=(9,),)
- self.assertEqual(new_id, last_id)
-
-
- def test_generateNewId(self, quiet=0, run=run_all_test):
- if not run: return
- if not quiet:
- self.logMessage('Check generateNewId method')
- idtool = self.portal.portal_ids
- # id tool generate ids based on a group
- self.assertEquals(0, idtool.generateNewId(id_group=('a', 'b')))
- self.assertEquals(1, idtool.generateNewId(id_group=('a', 'b')))
- # different groups generate different ids
- self.assertEquals(0, idtool.generateNewId(id_group=('a', 'b', 'c')))
-
- self.assertEquals(2, idtool.generateNewId(id_group=('a', 'b')))
- self.assertEquals(1, idtool.generateNewId(id_group=('a', 'b', 'c')))
-
- # you can pass an initial value
- self.assertEquals(4, idtool.generateNewId(id_group=('a', 'b', 'c', 'd'),
- default=4))
- self.assertEquals(5, idtool.generateNewId(id_group=('a', 'b', 'c', 'd'),
- default=4))
- #method to generate a special number
- def generateTestNumber(last_id):
- return ('A%s'%(last_id))
- # you can pass a method
- self.assertEquals('A0', idtool.generateNewId(id_group=('c', 'd'),
- method=generateTestNumber))
- self.assertEquals('AA0', idtool.generateNewId(id_group=('c', 'd'),
- method=generateTestNumber))
-
- self.assertEquals('AA', idtool.generateNewId(id_group=('c', 'd', 'e'),
- default='A',
- method=generateTestNumber))
- self.assertEquals('AAA', idtool.generateNewId(id_group=('c', 'd', 'e'),
- default='A',
- method=generateTestNumber))
-
+ # The last version is cached - reset cache
+ self.portal.portal_caches.clearAllCache()
+ last_sql = self.conceptual_sql_generator.getSpecialiseValue().\
+ getLatestVersionValue()
+ self.assertEquals(last_sql.getVersion(), '002')
+
+ def checkGenerateNewId(self, id_generator):
+ """
+ Check the method generateNewId
+ """
+ self.assertEquals(0, self.id_tool.generateNewId(id_generator=id_generator,
+ id_group='a02'))
+ # Different groups generate different ids
+ self.assertEquals(0, self.id_tool.generateNewId(id_generator=id_generator,
+ id_group='b02'))
+ self.assertEquals(1, self.id_tool.generateNewId(id_generator=id_generator,
+ id_group='a02'))
+ # With default value
+ self.assertEquals(0, self.id_tool.generateNewId(id_generator=id_generator,
+ id_group='c02', default=0))
+ self.assertEquals(20, self.id_tool.generateNewId(id_generator=id_generator,
+ id_group='d02', default=20))
+ self.assertEquals(21, self.id_tool.generateNewId(id_generator=id_generator,
+ id_group='d02', default=3))
+
+ def test_02a_generateNewIdWithZODBGenerator(self):
+ """
+ Check the generateNewId with a zodb id generator
+ Test that the dictionary of the zodb is filled
+ """
+ # check zodb dict is empty
+ zodb_generator = self.getLastGenerator('test_application_zodb')
+ zodb_portal_type = 'ZODB Continuous Increasing Id Generator'
+ self.assertEquals(zodb_generator.getPortalType(), zodb_portal_type)
+ self.assertEqual(getattr(zodb_generator, 'last_id_dict', None), None)
+ # generate ids
+ self.checkGenerateNewId('test_application_zodb')
+ # check zodb dict
+ self.assertEqual(zodb_generator.last_id_dict['c02'], 1)
+ self.assertEqual(zodb_generator.last_id_dict['d02'], 20)
+
+ def checkGenerateNewIdWithSQL(self, store):
+ """
+ Check the generateNewId with a sql id generator
+ Test that the database is update and also the zodb dictionary
+ (if the store is True)
+ """
+ # check zodb dict is empty
+ sql_generator = self.getLastGenerator('test_application_sql')
+ sql_portal_type = 'SQL Non Continuous Increasing Id Generator'
+ self.assertEquals(sql_generator.getPortalType(), sql_portal_type)
+ self.assertEqual(getattr(sql_generator, 'last_max_id_dict', None), None)
+ # retrieve method to recovery the last id in the database
+ last_id_method = getattr(self.portal, 'IdTool_zGetLastId', None)
+ self.assertNotEquals(last_id_method, None)
+ # store the ids in zodb
+ if store:
+ sql_generator.setStoredInZodb(True)
+ # generate ids
+ self.checkGenerateNewId('test_application_sql')
+ # check last_id in sql
+ self.assertEquals(last_id_method(id_group='c02')[0]['LAST_INSERT_ID()'], 1)
+ self.assertEquals(last_id_method(id_group='d02')[0]['LAST_INSERT_ID()'], 20)
+ # check zodb dict
+ if store:
+ self.assertEqual(sql_generator.last_max_id_dict['c02'].value, 1)
+ self.assertEqual(sql_generator.last_max_id_dict['d02'].value, 20)
+ else:
+ self.assertEqual(getattr(sql_generator, 'last_max_id_dict', None), None)
+
+ def test_02b_generateNewIdWithSQLGeneratorWithoutStorageZODB(self):
+ """
+ Check the generateNewId, the update of the database and
+ that the zodb dictionary is empty
+ """
+ self.checkGenerateNewIdWithSQL(store=False)
+
+ def test_02c_generateNewIdWithSQLGeneratorWithStorageZODB(self):
+ """
+ Check the generateNewId,the update of the database and
+ that the the zodb dictionary is filled
+ """
+ self.checkGenerateNewIdWithSQL(store=True)
+
+ def checkGenerateNewIdList(self, id_generator):
+ """
+ Check the generateNewIdList
+ """
+ self.assertEquals([0], self.id_tool.generateNewIdList(\
+ id_generator=id_generator, id_group='a03'))
+ # Different groups generate different ids
+ self.assertEquals([0, 1], self.id_tool.generateNewIdList(\
+ id_generator=id_generator,
+ id_group='b03', id_count=2))
+ self.assertEquals([1 ,2, 3], self.id_tool.generateNewIdList(\
+ id_generator=id_generator,
+ id_group='a03', id_count=3))
+ # With default value
+ self.assertEquals([0, 1, 2], self.id_tool.generateNewIdList(\
+ id_generator=id_generator,
+ id_group='c03', default=0, id_count=3))
+ self.assertEquals([20, 21, 22], self.id_tool.generateNewIdList(\
+ id_generator=id_generator,
+ id_group='d03', default=20, id_count=3))
+ self.assertEquals([23, 24], self.id_tool.generateNewIdList(\
+ id_generator=id_generator,
+ id_group='d03', default=3, id_count=2))
+
+ def test_03a_generateNewIdListWithZODBGenerator(self):
+ """
+ Check the generateNewIdList with zodb generator
+ """
+ self.checkGenerateNewIdList('test_application_zodb')
+
+ def test_03b_generateNewIdListWithSQLGenerator(self):
+ """
+ Check the generateNewIdList with sql generator
+ """
+ self.checkGenerateNewIdList('test_application_sql')
+
+ def test_04_generateNewIdAndgenerateNewIdListWithTwoGenerator(self):
+ """
+ Check that the same id_group between the generators is not modified
+ Check the generateNewIdList and generateNewId in the same test
+ """
+ self.assertEquals([1, 2, 3], self.id_tool.generateNewIdList(
+ id_generator='test_application_zodb',
+ id_group='a04', default=1, id_count=3))
+ self.assertEquals(4, self.id_tool.generateNewId(
+ id_generator='test_application_zodb',
+ id_group='a04'))
+ self.assertEquals(1, self.id_tool.generateNewId(
+ id_generator='test_application_sql',
+ id_group='a04', default=1))
+ self.assertEquals([2, 3, 4], self.id_tool.generateNewIdList(
+ id_generator='test_application_sql',
+ id_group='a04', id_count=3))
def test_suite():
suite = unittest.TestSuite()
More information about the Erp5-report
mailing list