[Erp5-report] r15532 - in /erp5/trunk/products/ERP5SyncML: Conduit/ tests/
nobody at svn.erp5.org
nobody at svn.erp5.org
Tue Aug 7 16:20:33 CEST 2007
Author: seb
Date: Tue Aug 7 16:20:33 2007
New Revision: 15532
URL: http://svn.erp5.org?rev=15532&view=rev
Log:
- add the VCardConduit and a unit test for it (by fabien)
Added:
erp5/trunk/products/ERP5SyncML/Conduit/VCardConduit.py (with props)
erp5/trunk/products/ERP5SyncML/tests/testERP5SyncMLVCard.py (with props)
Added: erp5/trunk/products/ERP5SyncML/Conduit/VCardConduit.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5SyncML/Conduit/VCardConduit.py?rev=15532&view=auto
==============================================================================
--- erp5/trunk/products/ERP5SyncML/Conduit/VCardConduit.py (added)
+++ erp5/trunk/products/ERP5SyncML/Conduit/VCardConduit.py Tue Aug 7 16:20:33 2007
@@ -1,0 +1,231 @@
+##############################################################################
+#
+# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
+# Fabien Morin <fabien.morin at gmail.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsability of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# garantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+##############################################################################
+
+from Products.ERP5SyncML.Conduit.ERP5Conduit import ERP5Conduit
+from AccessControl import ClassSecurityInfo
+from Products.ERP5Type import Permissions
+from Products.ERP5Type.Utils import convertToUpperCase
+from Products.CMFCore.utils import getToolByName
+from Products.ERP5SyncML.SyncCode import SyncCode
+from Products.ERP5SyncML.Subscription import Subscription
+from Acquisition import aq_base, aq_inner, aq_chain, aq_acquire
+from ZODB.POSException import ConflictError
+
+from zLOG import LOG
+
+class VCardConduit(ERP5Conduit, SyncCode):
+ """
+ A conduit is in charge to read data from a particular structure,
+ and then to save this data in another structure.
+
+ VCardConduit is a peace of code to update VCards from text stream
+ """
+
+
+ # Declarative security
+ security = ClassSecurityInfo()
+
+ security.declareProtected(Permissions.AccessContentsInformation,'__init__')
+ def __init__(self):
+ self.args = {}
+
+
+ security.declareProtected(Permissions.ModifyPortalContent, 'addVCard')
+ def addNode(self, xml=None, object=None, previous_xml=None,
+ object_id=None, sub_object=None, force=0, simulate=0, **kw):
+ """
+ add a new person corresponding to the vcard
+ if the person already exist, she's updated
+ """
+ LOG('VCardConduit',0,'addNode, object=%s, object_id=%s, sub_object:%s, \
+ xml:\n%s' % (str(object), str(object_id), str(sub_object), xml))
+ portal_type = 'Person' #the VCard can just use Person
+ if sub_object is None:
+
+ new_object = ERP5Conduit.constructContent(self, object, object_id ,
+ None, portal_type)
+ else: #if the object exist, it juste must be update
+ new_object=sub_object
+ LOG('addNode', 0, 'new_object:%s, sub_object:%s' % (new_object, sub_object))
+ self.updateNode(xml=xml,
+ object=new_object,
+ force=force,
+ simulate=simulate,
+ **kw)
+ #in a first time, conflict are not used
+ return {'conflict_list':None, 'object': new_object}
+
+ def deleteNode(self, xml=None, object=None, object_id=None, force=None,
+ simulate=0, **kw):
+ """
+ A node is deleted
+ """
+ LOG('deleteNode :', 0, 'object:%s, object_id:%s' % (str(object), str(object_id)))
+ conflict_list = []
+ try:
+ object._delObject(object_id)
+ except (AttributeError, KeyError):
+ LOG('VCardConduit',0,'deleteNode, Unable to delete: %s' % str(object_id))
+ return conflict_list
+
+ def updateNode(self, xml=None, object=None, previous_xml=None, force=0,
+ simulate=0, **kw):
+ """
+ A node is updated
+ """
+ LOG('updateNode :',0, 'xml:%s, object:%s, previous_xml:%s, force:%s,simulate:%s, kw:%s' % (xml, object, previous_xml, force, simulate, kw))
+ vcard_dict = self.vcard2Dict(xml)
+ object.edit(**vcard_dict)
+ return []
+
+ def getCapabilitiesCTTypeList(self):
+ """
+ return the a list of CTType capabilities supported
+ """
+ return self.MEDIA_TYPE.values()
+
+ def getCapabilitiesVerCTList(self, capabilities_ct_type):
+ """
+ return a list of version of the CTType supported
+ """
+ #add here the other version supported
+ verCTTypeList = {}
+ verCTTypeList[self.MEDIA_TYPE['TEXT_VCARD']]=('3.0',)
+ verCTTypeList[self.MEDIA_TYPE['TEXT_XVCARD']]=('2.1',)
+ return verCTTypeList[capabilities_ct_type]
+
+
+ def getPreferedCapabilitieVerCT(self):
+ """
+ return the prefered capabilitie VerCT
+ """
+ prefered_version = '2.1'
+ return prefered_version
+
+ def getPreferedCapabilitieCTType(self):
+ """
+ return the prefered capabilitie VerCT
+ """
+ prefered_type = self.MEDIA_TYPE['TEXT_XVCARD']
+ return prefered_type
+
+ def getGidFromXML(self, vcard):
+ """
+ return the Gid composed of FirstName and LastName
+ """
+ vcard_dict = self.vcard2Dict(vcard)
+ gid_from_vcard = []
+ gid_from_vcard.append(vcard_dict['first_name'])
+ gid_from_vcard.append(' ')
+ gid_from_vcard.append(vcard_dict['last_name'])
+ gid_from_vcard = ''.join(gid_from_vcard)
+ return gid_from_vcard
+
+ def changePropertyEncoding(self, property_parameters_list,
+ property_value_list):
+ """
+ if there is a property 'ENCODING', change the string encoding to utf-8
+ """
+ encoding=''
+
+ for item in property_parameters_list :
+ if item.has_key('ENCODING'):
+ encoding = item['ENCODING']
+
+ property_value_list_well_incoded=[]
+ if encoding == 'QUOTED-PRINTABLE':
+ import mimify
+ for property_value in property_value_list:
+ property_value = mimify.mime_decode(property_value)
+ property_value_list_well_incoded.append(property_value)
+ #elif ... put here the other encodings
+ else:
+ property_value_list_well_incoded=property_value_list
+
+ return property_value_list_well_incoded
+
+ def vcard2Dict(self, vcard):
+ """
+ transalate the vcard to a dict understandable by erp5 like
+ {'fisrt_name':'MORIN', 'last_name':'Fabien'}
+ """
+ #LOG('vcard =',0,vcard)
+ convert_dict = {}
+ convert_dict['FN'] = 'first_name'
+ convert_dict['N'] = 'last_name'
+ convert_dict['TEL'] = 'default_telephone_text'
+ edit_dict = {}
+ vcard_list = vcard.split('\n')
+ for vcard_line in vcard_list:
+ if ':' in vcard_line:
+ property, property_value = vcard_line.split(':')
+ property_value_list=property_value.split(';')
+ property_parameters_list = []
+ property_name = ''
+ if ';' in property:
+ property_list = property.split(';')
+ property_name = property_list[0] #the property name is the 1st element
+ if len(property_list) > 1 and property_list[1] != '':
+ property_parameters_list = property_list[1:len(property_list)]
+ tmp = []
+ for property_parameter in property_parameters_list:
+ if '=' in property_parameter:
+ property_parameter_name, property_parameter_value = \
+ property_parameter.split('=')
+ else:
+ property_parameter_name = property_parameter
+ property_parameter_value = None
+ tmp.append({property_parameter_name:property_parameter_value})
+ property_parameters_list = tmp
+ #now property_parameters_list looks like :
+ # [{'ENCODING':'QUOTED-PRINTABLE'}, {'CHARSET':'UTF-8'}]
+
+ property_value_list = \
+ self.changePropertyEncoding(property_parameters_list,
+ property_value_list)
+
+ else:
+ property_name=property
+ if type(property_name) is type(u'a'):
+ property_name = property_name.encode('utf-8')
+
+ tmp=[]
+ for property_value in property_value_list:
+ if type(property_value) is type(u'a'):
+ property_value = property_value.encode('utf-8')
+ tmp.append(property_value)
+ property_value_list=tmp
+ if property_name in convert_dict.keys():
+ if property_name == 'N' and len(property_value_list) > 1:
+ edit_dict[convert_dict['N']]=property_value_list[0]
+ edit_dict[convert_dict['FN']]=property_value_list[1]
+ else:
+ edit_dict[convert_dict[property_name]]=property_value_list[0]
+ #LOG('edit_dict =',0,edit_dict)
+ return edit_dict
+
Propchange: erp5/trunk/products/ERP5SyncML/Conduit/VCardConduit.py
------------------------------------------------------------------------------
svn:executable = *
Added: erp5/trunk/products/ERP5SyncML/tests/testERP5SyncMLVCard.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5SyncML/tests/testERP5SyncMLVCard.py?rev=15532&view=auto
==============================================================================
--- erp5/trunk/products/ERP5SyncML/tests/testERP5SyncMLVCard.py (added)
+++ erp5/trunk/products/ERP5SyncML/tests/testERP5SyncMLVCard.py Tue Aug 7 16:20:33 2007
@@ -1,0 +1,227 @@
+##############################################################################
+# vim: set fileencoding=utf-8
+#
+# Copyright (c) 2007 Nexedi SARL and Contributors. All Rights Reserved.
+# Fabien Morin <fabien.morin at gmail.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsability of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# garantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+##############################################################################
+
+
+import os, sys
+if __name__ == '__main__':
+ execfile(os.path.join(sys.path[0], 'framework.py'))
+
+# Needed in order to have a log file inside the current folder
+os.environ['EVENT_LOG_FILE'] = os.path.join(os.getcwd(), 'zLOG.log')
+os.environ['EVENT_LOG_SEVERITY'] = '-300'
+
+from Testing import ZopeTestCase
+from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
+from AccessControl.SecurityManagement import newSecurityManager
+from Products.ERP5SyncML.Conduit.VCardConduit import VCardConduit
+from Products.ERP5SyncML.SyncCode import SyncCode
+from testERP5SyncML import TestERP5SyncMLMixin
+from zLOG import LOG
+
+class TestERP5SyncMLVCard(TestERP5SyncMLMixin, ERP5TypeTestCase):
+
+ run_all_test = True
+
+ def getBusinessTemplateList(self):
+ """
+ Return the list of business templates.
+
+ the business template sync_crm give 3 folders:
+ /person_server
+ /person_client1 : empty
+ /person_client2 : empty
+ """
+ return ('erp5_base','fabien_bt')
+
+ def test_01_AddVCardPublication(self, quiet=0, run=run_all_test):
+ if not run: return
+ if not quiet:
+ ZopeTestCase._print('\nTest Add a VCard Publication ')
+ LOG('Testing... ',0,'test_36_AddVCardPublication')
+ portal_id = self.getPortalName()
+ portal_sync = self.getSynchronizationTool()
+ portal_sync.manage_addPublication(self.pub_id, self.publication_url,
+ '/%s/person_server' % portal_id, 'Person', 'objectValues',
+ 'Person_exportAsVCard', 'VCardConduit', '', 'generateNewId',
+ 'getId', SyncCode.MEDIA_TYPE['TEXT_VCARD'])
+ pub = portal_sync.getPublication(self.pub_id)
+ self.failUnless(pub is not None)
+
+ def test_02_AddVCardSubscription1(self, quiet=0, run=run_all_test):
+ if not run: return
+ if not quiet:
+ ZopeTestCase._print('\nTest Add First VCard Subscription ')
+ LOG('Testing... ',0,'test_02_AddVCardSubscription1')
+ portal_id = self.getPortalId()
+ portal_sync = self.getSynchronizationTool()
+ portal_sync.manage_addSubscription(self.sub_id1, self.publication_url,
+ self.subscription_url1, '/%s/person_client1' % portal_id,
+ 'Person', 'Person', 'objectValues', 'Person_exportAsVCard',
+ 'VCardConduit', '', 'generateNewId', 'getId',
+ SyncCode.MEDIA_TYPE['TEXT_VCARD'])
+ sub = portal_sync.getSubscription(self.sub_id1)
+ self.failUnless(sub is not None)
+
+ def test_03_AddVCardSubscription2(self, quiet=0, run=run_all_test):
+ if not run: return
+ if not quiet:
+ ZopeTestCase._print('\nTest Add Second VCard Subscription ')
+ LOG('Testing... ',0,'test_03_AddVCardSubscription2')
+ portal_id = self.getPortalId()
+ portal_sync = self.getSynchronizationTool()
+ portal_sync.manage_addSubscription(self.sub_id2, self.publication_url,
+ self.subscription_url2, '/%s/person_client2' % portal_id,
+ 'Person', 'Person', 'objectValues', 'Person_exportAsVCard',
+ 'VCardConduit', '', 'generateNewId', 'getId',
+ SyncCode.MEDIA_TYPE['TEXT_VCARD'])
+ sub = portal_sync.getSubscription(self.sub_id2)
+ self.failUnless(sub is not None)
+
+ def test_04_FirstVCardSynchronization(self, quiet=0, run=run_all_test):
+ # We will try to populate the folder person_client1
+ # with the data form person_server
+ if not run: return
+ if not quiet:
+ ZopeTestCase._print('\nTest First VCard Synchronization ')
+ LOG('Testing... ',0,'test_04_FirstVCardSynchronization')
+ self.login()
+ self.test_01_AddVCardPublication(quiet=True, run=True)
+ self.test_02_AddVCardSubscription1(quiet=True, run=True)
+ self.test_03_AddVCardSubscription2(quiet=True, run=True)
+ nb_person = self.populatePersonServer(quiet=1,run=1)
+ portal_sync = self.getSynchronizationTool()
+ for sub in portal_sync.getSubscriptionList():
+ self.assertEquals(sub.getSynchronizationType(),SyncCode.SLOW_SYNC)
+ # Synchronize the first client
+ nb_message1 = self.synchronize(self.sub_id1)
+ for sub in portal_sync.getSubscriptionList():
+ if sub.getTitle() == self.sub_id1:
+ self.assertEquals(sub.getSynchronizationType(),SyncCode.TWO_WAY)
+ else:
+ self.assertEquals(sub.getSynchronizationType(),SyncCode.SLOW_SYNC)
+ self.failUnless(nb_message1==self.nb_message_first_synchronization)
+ # Synchronize the second client
+ nb_message2 = self.synchronize(self.sub_id2)
+ for sub in portal_sync.getSubscriptionList():
+ self.assertEquals(sub.getSynchronizationType(),SyncCode.TWO_WAY)
+ self.failUnless(nb_message2==self.nb_message_first_synchronization)
+ self.checkFirstSynchronization(id='1', nb_person=nb_person)
+
+ def test_05_basicVCardSynchronization(self, quiet=0, run=run_all_test):
+ """
+ synchronize two ERP5Sites using VCards
+ """
+
+ if not run: return
+ if not quiet:
+ ZopeTestCase._print('\nTest Basic VCard Synchronization')
+ LOG('Testing... ',0,'test_05_basicVCardSynchronization')
+
+ self.test_04_FirstVCardSynchronization(quiet=True, run=True)
+
+
+ portal_sync = self.getSynchronizationTool()
+ person_server = self.getPersonServer()
+ person1_s = person_server._getOb(self.id1)
+ person_client1 = self.getPersonClient1()
+ person1_c = person_client1._getOb('1') #The new person is added with a
+ #generate id (the first is 1)
+
+ # try to synchronize
+ kw = {'first_name':self.first_name3,'last_name':self.last_name3}
+ person1_c.edit(**kw)
+ #before synchornization, First and Last name souldn't be the same
+ self.verifyFirstNameAndLastNameAreNotSynchronized(self.first_name3,
+ self.last_name3, person1_s, person1_c)
+ self.synchronize(self.sub_id1)
+ #after synchronization, a new person is create on the server
+ person1_s = person_server._getOb('1') #The new person is added on the
+ #serverwith a generate id (the first is 1)
+
+ #after the synchro, the client and server should be synchronized
+ self.checkSynchronizationStateIsSynchronized()
+ self.verifyFirstNameAndLastNameAreSynchronized(self.first_name3,
+ self.last_name3, person1_s, person1_c)
+
+ def test_05_verifyNoDuplicateDataWhenAdding(self, quiet=0, run=run_all_test):
+ """
+ this test permit to verify that if the server already have the person,
+ he don't add it a second time
+ """
+ if not run: return
+ if not quiet:
+ ZopeTestCase._print('\nTest No Duplicate Data When Adding')
+ LOG('Testing... ',0,'test_05_verifyNoDuplicateDataWhenAdding')
+ self.test_04_FirstVCardSynchronization(quiet=True, run=True)
+ portal_sync = self.getSynchronizationTool()
+ sub1 = portal_sync.getSubscription(self.sub_id1)
+ sub2 = portal_sync.getSubscription(self.sub_id2)
+ pub = portal_sync.getPublication(self.pub_id)
+
+ person_server = self.getPersonServer()
+ person1_s = person_server._getOb(self.id1)
+ person_client1 = self.getPersonClient1()
+ person1_c = person_client1._getOb('1') #The new person is added with a
+ #generate id (the first is 1)
+
+ # try to synchronize
+ kw = {'first_name':self.first_name3,'last_name':self.last_name3}
+ person1_c.edit(**kw)
+ person1_s.edit(**kw) #the same person is added on client AND server
+ #before synchornization, First and Last name souldn't be the same
+ self.verifyFirstNameAndLastNameAreSynchronized(self.first_name3,
+ self.last_name3, person1_s, person1_c)
+ nb_person_serv_before_sync = len(pub.getObjectList())
+ self.synchronize(self.sub_id1)
+ #after synchronization, no new person is created on server because it
+ #already have this person
+ #person1_s = person_server._getOb('1') #The new person is added on the
+ #serverwith a generate id (the first is 1)
+
+ #after the synchro, the client and server should be synchronized
+ self.checkSynchronizationStateIsSynchronized()
+ self.verifyFirstNameAndLastNameAreSynchronized(self.first_name3,
+ self.last_name3, person1_s, person1_c)
+
+ nb_person_serv_after_sync = len(pub.getObjectList())
+ #the number of person on server before and after the synchronization should
+ #be the same
+ nb_person_serv_after_sync = len(pub.getObjectList())
+ self.failUnless(nb_person_serv_after_sync==nb_person_serv_before_sync)
+
+
+
+if __name__ == '__main__':
+ framework()
+else:
+ import unittest
+ def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(TestERP5SyncMLVCard))
+ return suite
Propchange: erp5/trunk/products/ERP5SyncML/tests/testERP5SyncMLVCard.py
------------------------------------------------------------------------------
svn:executable = *
More information about the Erp5-report
mailing list