[Erp5-report] r28220 - /erp5/trunk/products/ERP5SyncML/tests/testERP5DocumentSyncML.py
nobody at svn.erp5.org
nobody at svn.erp5.org
Thu Jul 30 14:19:11 CEST 2009
Author: daniele
Date: Thu Jul 30 14:19:10 2009
New Revision: 28220
URL: http://svn.erp5.org?rev=28220&view=rev
Log:
Create test for synchronise many documents
Added:
erp5/trunk/products/ERP5SyncML/tests/testERP5DocumentSyncML.py
Added: erp5/trunk/products/ERP5SyncML/tests/testERP5DocumentSyncML.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5SyncML/tests/testERP5DocumentSyncML.py?rev=28220&view=auto
==============================================================================
--- erp5/trunk/products/ERP5SyncML/tests/testERP5DocumentSyncML.py (added)
+++ erp5/trunk/products/ERP5SyncML/tests/testERP5DocumentSyncML.py [utf8] Thu Jul 30 14:19:10 2009
@@ -1,0 +1,1117 @@
+# -*- coding: utf-8 -*-
+##############################################################################
+#
+# Copyright (c) 2009 Nexedi SA and Contributors. All Rights Reserved.
+# Danièle Vanbaelinghem <daniele at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsability of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# garantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+##############################################################################
+
+import os
+import unittest
+from Testing import ZopeTestCase
+from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
+from AccessControl.SecurityManagement import newSecurityManager
+from Products.ERP5SyncML.Conduit.ERP5DocumentConduit import ERP5DocumentConduit
+from Products.ERP5SyncML.SyncCode import SyncCode
+from zLOG import LOG
+from base64 import b16encode, b16decode
+import transaction
+from ERP5Diff import ERP5Diff
+from lxml import etree
+from Products.ERP5Type.tests.utils import FileUpload
+from Products.ERP5OOo.Document.OOoDocument import ConversionError
+
+ooodoc_coordinates = ('127.0.0.1', 8008)
+test_files = os.path.join(os.path.dirname(__file__), 'test_document')
+FILE_NAME_REGULAR_EXPRESSION = "(?P<reference>[A-Z]{3,10})-(?P<language>[a-z]{2})-(?P<version>[0-9]{3})"
+REFERENCE_REGULAR_EXPRESSION = "(?P<reference>[A-Z]{3,10})(-(?P<language>[a-z]{2}))?(-(?P<version>[0-9]{3}))?"
+
+def makeFileUpload(name):
+ path = os.path.join(test_files, name)
+ return FileUpload(path, name)
+
+class TestERP5DocumentSyncMLMixin:
+
+ nb_objects = 100
+ #for objects
+ ids = range(1, nb_objects+1)
+ #id_max_text : number of document text
+ id_max_text = nb_objects/2
+ id1 = '2'
+ id2 = '3'
+ #for documents (encoding in unicode for utf-8)
+ #files
+ filename_text = 'TEST-en-002.txt'
+ size_filename_text = len(makeFileUpload(filename_text).read())
+ filename_odt = 'TEST-en-002.odt'
+ size_filename_odt = len(makeFileUpload(filename_odt).read())
+ filename_ppt = 'TEST-en-002.ppt'
+ size_filename_ppt = len(makeFileUpload(filename_ppt).read())
+ filename_pdf = 'TEST-en-002.pdf'
+ size_filename_pdf = len(makeFileUpload(filename_pdf).read())
+ #properties
+ reference1 = 'P-SYNCML.Text'
+ version1 = '001'
+ language1 = 'en'
+ #description1 - blaàéc1
+ description1 = 'description1 - blac\xc3\xa0\xc3\xa91'
+ short_title1 = 'P-SYNCML-Text'
+ reference2 = 'P-SYNCML-SyncML.Document.Pdf'
+ version2 = '001'
+ language2 = 'fr'
+ #description2 - file $£µ%c2éè!
+ description2 = 'description2 - file $\xc2\xa3\xc2\xb5%c2\xc3\xa9\xc3\xa8!'
+ short_title2 = 'P-SYNCML-Pdf'
+ reference3 = 'P-SYNCML-SyncML.Document.WebPage'
+ version3 = '001'
+ language3 = 'ja'
+ #description3 - file description3 - file ù@
+ description3 = 'description3 - file \xc3\xb9@'
+ short_title3 = 'P-SYNCML-WebPage'
+ #for synchronization
+ pub_id = 'Publication'
+ sub_id1 = 'Subscription1'
+ sub_id_from_server = 'SubscriptionFromServer'
+ pub_query = 'objectValues'
+ sub_query1 = 'objectValues'
+ xml_mapping = 'asXML'
+ pub_conduit = 'ERP5DocumentConduit'
+ sub_conduit1 = 'ERP5DocumentConduit'
+ activity_enabled = True
+ publication_url = 'file://tmp/sync_server'
+ subscription_url = { 'two_way' : 'file://tmp/sync_client1', \
+ 'from_server' : 'file://tmp/sync_client_from_server'}
+ #for this tests
+ nb_message_first_synchronization = 12
+ nb_message_multi_first_synchronization = 12
+ nb_synchronization = 2
+ nb_subscription = 1
+ nb_publication = 1
+ #default edit_workflow
+ workflow_id = 'processing_status_workflow'
+
+
+ def getBusinessTemplateList(self):
+ """
+ Return the list of business templates.
+
+ the business template sync_crm give 3 folders:
+ /document_server
+ /document_client1 : empty
+ /document_client_from_server : empty
+ """
+ return ('erp5_base', 'erp5_ingestion',\
+ 'erp5_ingestion_mysql_innodb_catalog', 'erp5_web',\
+ 'erp5_dms')
+
+
+ def afterSetUp(self):
+ """Setup."""
+ self.login()
+ self.addPublications()
+ self.addSubscriptions()
+ self.portal = self.getPortal()
+ self.setSystemPreferences()
+ transaction.commit()
+ self.tic()
+
+ def beforeTearDown(self):
+ """
+ Do some stuff after each test:
+ - clear document module of server and client
+ - clear the publications and subscriptions
+ """
+ self.clearDocumentModules()
+ self.clearPublicationsAndSubscriptions()
+
+ def setSystemPreferences(self):
+ default_pref = self.portal.portal_preferences.default_site_preference
+ default_pref.setPreferredOoodocServerAddress(ooodoc_coordinates[0])
+ default_pref.setPreferredOoodocServerPortNumber(ooodoc_coordinates[1])
+ default_pref.setPreferredDocumentFileNameRegularExpression(FILE_NAME_REGULAR_EXPRESSION)
+ default_pref.setPreferredDocumentReferenceRegularExpression(REFERENCE_REGULAR_EXPRESSION)
+ if default_pref.getPreferenceState() == 'disabled':
+ default_pref.enable()
+
+ def addSubscriptions(self):
+ portal_id = self.getPortalId()
+ portal_sync = self.getSynchronizationTool()
+ if portal_sync.getSubscription(self.sub_id1) is None:
+ portal_sync.manage_addSubscription(title=self.sub_id1,
+ publication_url=self.publication_url,
+ subscription_url=self.subscription_url['two_way'],
+ destination_path='/%s/document_client1' % portal_id,
+ source_uri='Document:',
+ target_uri='Document',
+ query= self.sub_query1,
+ xml_mapping=self.xml_mapping,
+ conduit=self.sub_conduit1,
+ #alert_code=SyncCode.TWO_WAY,
+ gpg_key='',
+ activity_enabled=True,
+ login='daniele',
+ password='myPassword')
+ sub = portal_sync.getSubscription(self.sub_id1)
+ self.assertTrue(sub is not None)
+
+ def addPublications(self):
+ portal_id = self.getPortalName()
+ portal_sync = self.getSynchronizationTool()
+ if portal_sync.getPublication(self.pub_id) is None:
+ portal_sync.manage_addPublication(title=self.pub_id,
+ publication_url=self.publication_url,
+ destination_path='/%s/document_server' % portal_id,
+ source_uri='Document',
+ query=self.pub_query,
+ xml_mapping=self.xml_mapping,
+ conduit=self.pub_conduit,
+ gpg_key='',
+ activity_enabled=True,
+ authentication_format='b64',
+ authentication_type='syncml:auth-basic')
+ pub = portal_sync.getPublication(self.pub_id)
+ self.assertTrue(pub is not None)
+
+ def createDocumentModules(self, OneWay=False):
+ self.login()
+ if not hasattr(self.portal, 'document_server'):
+ self.portal.portal_types.constructContent(type_name = 'Document Module',
+ container = self.portal,
+ id = 'document_server')
+
+ if not hasattr(self.portal, 'document_client1'):
+ self.portal.portal_types.constructContent(type_name = 'Document Module',
+ container = self.portal,
+ id = 'document_client1')
+
+ if OneWay:
+ if not hasattr(self.portal, 'document_client_from_server'):
+ self.portal.portal_types.constructContent(type_name = 'Document Module',
+ container = self.portal,
+ id = 'document_client_from_server')
+ def clearDocumentModules(self):
+ portal_sync = self.getSynchronizationTool()
+ document_server = self.getDocumentServer()
+ if document_server is not None:
+ self.portal._delObject(id='document_server')
+ self.portal._delObject(id='document_client1')
+ transaction.commit()
+ self.tic()
+
+ def clearPublicationsAndSubscriptions(self):
+ portal_sync = self.getSynchronizationTool()
+ for pub in portal_sync.getPublicationList():
+ portal_sync.manage_deletePublication(pub.getTitle())
+ for sub in portal_sync.getSubscriptionList():
+ portal_sync.manage_deleteSubscription(sub.getTitle())
+ transaction.commit()
+ self.tic()
+
+ ####################
+ ### Usefull methods
+ ####################
+
+ def getSynchronizationTool(self):
+ return getattr(self.portal, 'portal_synchronizations', None)
+
+ def getDocumentClient1(self):
+ return getattr(self.portal, 'document_client1', None)
+
+ def getDocumentClientFromServer(self):
+ return getattr(self.portal, 'document_client_from_server', None)
+
+ def getDocumentServer(self):
+ return getattr(self.portal, 'document_server', None)
+
+ def getPortalId(self):
+ return self.portal.getId()
+
+ def login(self, quiet=0):
+ uf = self.portal.acl_users
+ uf._doAddUser('daniele', 'myPassword', ['Manager'], [])
+ uf._doAddUser('ERP5TypeTestCase', '', ['Manager'], [])
+ uf._doAddUser('syncml', '', ['Manager'], [])
+ user = uf.getUserById('daniele').__of__(uf)
+ newSecurityManager(None, user)
+
+ def resetSignaturePublicationAndSubscription(self):
+ portal_sync = self.getSynchronizationTool()
+ publication = portal_sync.getPublication(self.pub_id)
+ subscription1 = portal_sync.getSubscription(self.sub_id1)
+ publication.resetAllSubscribers()
+ subscription1.resetAllSignatures()
+ transaction.commit()
+ self.tic()
+
+ def documentMultiServer(self, quiet=0, run=0):
+ # create different document by category documents
+ if not run: return
+ if not quiet:
+ ZopeTestCase._print('\nTest Document Multi Server')
+ LOG('Testing... ', 0, 'documentMultiServer')
+ self.createDocumentModules()
+ document_id = ''
+ document_server = self.getDocumentServer()
+ #plain text document
+ for id in self.ids[:self.id_max_text]:
+ reference = "Test-Text-%s" % (id,)
+ self.createDocument(id=id, file_name=self.filename_text, reference=reference)
+ transaction.commit()
+ nb_document = len(document_server.objectValues())
+ self.assertEqual(nb_document, len(self.ids[:self.id_max_text]))
+ #binary document
+ for id in self.ids[self.id_max_text:]:
+ reference = "Test-Odt-%s" % (id, )
+ self.createDocument(id=id, file_name=self.filename_odt, reference=reference)
+ transaction.commit()
+ self.tic()
+ nb_document = len(document_server.objectValues())
+ self.assertEqual(nb_document, len(self.ids))
+ return nb_document
+
+ def documentServer(self, quiet=0, run=0, OneWay=False):
+ # XXX create classification category for documents
+ if not run: return
+ if not quiet:
+ ZopeTestCase._print('\nTest Document Server')
+ LOG('Testing... ', 0, 'documentServer')
+ self.createDocumentModules(OneWay)
+ document_id = ''
+ document_server = self.getDocumentServer()
+ if getattr(document_server, self.id1, None) is not None:
+ self.clearDocumentModules()
+ document_text = document_server.newContent(id=self.id1,\
+ portal_type='Text')
+ kw = {'reference':self.reference1, 'Version':self.version1,\
+ 'Language':self.language1, 'Description':self.description1}
+ document_text.edit(**kw)
+ file = makeFileUpload(self.filename_text)
+ document_text.edit(file=file)
+ #document_text.convertToBaseFormat()
+ transaction.commit()
+ self.tic()
+ document_pdf = document_server.newContent(id=self.id2,\
+ portal_type='PDF')
+ kw = {'reference':self.reference2, 'Version':self.version2,\
+ 'Language':self.language2, 'Description':self.description2}
+ document_pdf.edit(**kw)
+ file = makeFileUpload(self.filename_pdf)
+ document_pdf.edit(file=file)
+ #document_pdf.convertToBaseFormat()
+ transaction.commit()
+ self.tic()
+ nb_document = len(document_server.objectValues())
+ self.assertEqual(nb_document, 2)
+ return nb_document
+
+ def createDocument(self, id, file_name=None, portal_type='Text',
+ reference='P-SYNCML.Text', version='001', language='en'):
+ """
+ Create a text document
+ """
+ document_server = self.getDocumentServer()
+ if getattr(document_server, str(id), None) is not None:
+ self.clearDocumentModules()
+ doc_text = document_server.newContent(id=id, portal_type=portal_type)
+ kw = {'reference': reference, 'version': version, 'language': language}
+ doc_text.edit(**kw)
+ if file_name is not None:
+ file = makeFileUpload(file_name)
+ doc_text.edit(file=file)
+ return doc_text
+
+ def synchronize(self, id, run=1):
+ """
+ This just define how we synchronize, we have
+ to define it here because it is specific to the unit testing
+ """
+ portal_sync = self.getSynchronizationTool()
+ subscription = portal_sync.getSubscription(id)
+ publication = None
+ for pub in portal_sync.getPublicationList():
+ if pub.getPublicationUrl()==subscription.getPublicationUrl():
+ publication = pub
+ self.assertTrue(publication is not None)
+ # reset files, because we do sync by files
+ file = open(subscription.getSubscriptionUrl()[len('file:/'):], 'w')
+ file.write('')
+ file.close()
+ file = open(self.publication_url[len('file:/'):], 'w')
+ file.write('')
+ file.close()
+ transaction.commit()
+ self.tic()
+ nb_message = 1
+ result = portal_sync.SubSync(subscription.getPath())
+ while result['has_response']==1:
+ portal_sync.PubSync(publication.getPath())
+ if self.activity_enabled:
+ transaction.commit()
+ self.tic()
+ result = portal_sync.SubSync(subscription.getPath())
+ if self.activity_enabled:
+ transaction.commit()
+ self.tic()
+ nb_message += 1 + result['has_response']
+ return nb_message
+
+ def synchronizeWithBrokenMessage(self, id, run=1):
+ """
+ This just define how we synchronize, we have
+ to define it here because it is specific to the unit testing
+ """
+ portal_sync = self.getSynchronizationTool()
+ #portal_sync.email = None # XXX To be removed
+ subscription = portal_sync.getSubscription(id)
+ publication = None
+ for publication in portal_sync.getPublicationList():
+ if publication.getPublicationUrl()==subscription.getSubscriptionUrl():
+ publication = publication
+ self.assertTrue(publication is not None)
+ # reset files, because we do sync by files
+ file = open(subscription.getSubscriptionUrl()[len('file:/'):], 'w')
+ file.write('')
+ file.close()
+ file = open(self.publication_url[len('file:/'):], 'w')
+ file.write('')
+ file.close()
+ transaction.commit()
+ self.tic()
+ nb_message = 1
+ result = portal_sync.SubSync(subscription.getPath())
+ while result['has_response']==1:
+ # We do thing three times, so that we will test
+ # if we manage well duplicate messages
+ portal_sync.PubSync(publication.getPath())
+ if self.activity_enabled:
+ transaction.commit()
+ self.tic()
+ portal_sync.PubSync(publication.getPath())
+ if self.activity_enabled:
+ transaction.commit()
+ self.tic()
+ portal_sync.PubSync(publication.getPath())
+ if self.activity_enabled:
+ transaction.commit()
+ self.tic()
+ result = portal_sync.SubSync(subscription.getPath())
+ if self.activity_enabled:
+ transaction.commit()
+ self.tic()
+ result = portal_sync.SubSync(subscription.getPath())
+ if self.activity_enabled:
+ transaction.commit()
+ self.tic()
+ result = portal_sync.SubSync(subscription.getPath())
+ if self.activity_enabled:
+ transaction.commit()
+ self.tic()
+ nb_message += 1 + result['has_response']
+ return nb_message
+
+ def checkSynchronizationStateIsSynchronized(self, quiet=0, run=1):
+ portal_sync = self.getSynchronizationTool()
+ document_server = self.getDocumentServer()
+ for document in document_server.objectValues():
+ state_list = portal_sync.getSynchronizationState(document)
+ for state in state_list:
+ self.assertEqual(state[1], state[0].SYNCHRONIZED)
+ document_client1 = self.getDocumentClient1()
+ for document in document_client1.objectValues():
+ state_list = portal_sync.getSynchronizationState(document)
+ for state in state_list:
+ self.assertEqual(state[1], state[0].SYNCHRONIZED)
+ # Check for each signature that the tempXML is None
+ for sub in portal_sync.getSubscriptionList():
+ for m in sub.getSignatureList():
+ self.assertEquals(m.getTempXML(), None)
+ self.assertEquals(m.getPartialXML(), None)
+ for pub in portal_sync.getPublicationList():
+ for sub in pub.getSubscriberList():
+ for m in sub.getSignatureList():
+ self.assertEquals(m.getPartialXML(), None)
+
+ def checkFirstSynchronization(self, nb_document=0):
+
+ portal_sync = self.getSynchronizationTool()
+ subscription1 = portal_sync.getSubscription(self.sub_id1)
+ self.assertEqual(len(subscription1.getObjectList()), nb_document)
+ document_server = self.getDocumentServer() # We also check we don't
+ # modify initial ob
+ doc1_s = document_server._getOb(self.id1)
+ self.assertEqual(doc1_s.getId(), self.id1)
+ self.assertEqual(doc1_s.getReference(), self.reference1)
+ self.assertEqual(doc1_s.getVersion(), self.version1)
+ self.assertEqual(doc1_s.getLanguage(), self.language1)
+ self.assertEqual(doc1_s.getSourceReference(), self.filename_text)
+ self.assertEquals(self.size_filename_text, doc1_s.get_size())
+ doc2_s = document_server._getOb(self.id2)
+ self.assertEqual(doc2_s.getReference(), self.reference2)
+ self.assertEqual(doc2_s.getVersion(), self.version2)
+ self.assertEqual(doc2_s.getLanguage(), self.language2)
+ self.assertEqual(doc2_s.getSourceReference(), self.filename_pdf)
+ self.assertEquals(self.size_filename_pdf, doc2_s.get_size())
+ document_client1 = self.getDocumentClient1()
+ document_c = document_client1._getOb(self.id1)
+ self.assertEqual(document_c.getId(), self.id1)
+ self.assertEqual(document_c.getReference(), self.reference1)
+ self.assertEqual(document_c.getVersion(), self.version1)
+ self.assertEqual(document_c.getLanguage(), self.language1)
+ self.assertEqual(document_c.getSourceReference(), self.filename_text)
+ self.assertEquals(self.size_filename_text, document_c.get_size())
+ self.assertXMLViewIsEqual(self.sub_id1, doc1_s, document_c)
+ self.assertXMLViewIsEqual(self.sub_id1, doc2_s,\
+ document_client1._getOb(self.id2))
+
+ def checkDocument(self, id=id, document=None, filename=None,
+ size_filename=None, reference='P-SYNCML.Text',
+ portal_type='Text', version='001', language='en',
+ description=''):
+ """
+ Check synchronization with a document and the informations provided
+ """
+ if document is not None:
+ self.assertEqual(document.getId(), id)
+ self.assertEqual(document.getReference(), reference)
+ self.assertEqual(document.getVersion(), version)
+ self.assertEqual(document.getLanguage(), language)
+ self.assertEqual(document.getDescription(), description)
+ if filename is not None:
+ self.assertEqual(document.getSourceReference(), filename)
+ self.assertEquals(size_filename, document.get_size())
+ else:
+ self.fail("Document is None for check these informations")
+
+ def checkXMLsSynchronized(self):
+ document_server = self.getDocumentServer()
+ document_client1 = self.getDocumentClient1()
+ for id in self.ids:
+ doc_s = document_server._getOb(str(id))
+ doc_c = document_client1._getOb(str(id))
+ self.assertXMLViewIsEqual(self.sub_id1, doc_s, doc_c)
+
+
+ def checkFirstSynchronizationWithMultiDocument(self, nb_document=0):
+ portal_sync = self.getSynchronizationTool()
+ subscription1 = portal_sync.getSubscription(self.sub_id1)
+ self.assertEqual(len(subscription1.getObjectList()), nb_document)
+ document_server = self.getDocumentServer()
+ document_client1 = self.getDocumentClient1()
+ id = str(self.ids[0])
+ doc_text_s = document_server._getOb(id)
+ reference = 'Test-Text-%s' % id
+ self.checkDocument(id=id, document=doc_text_s,
+ reference=reference,
+ filename=self.filename_text,
+ size_filename=self.size_filename_text)
+ id = str(self.ids[self.id_max_text])
+ doc_odt_s = document_server._getOb(id)
+ reference = 'Test-Odt-%s' % id
+ self.checkDocument(id=id, document=doc_odt_s,
+ reference=reference,
+ filename=self.filename_odt,
+ size_filename=self.size_filename_odt)
+ self.checkXMLsSynchronized()
+
+ def assertXMLViewIsEqual(self, sub_id, object_pub=None, object_sub=None,\
+ force=0):
+ """
+ Check the equality between two xml objects with gid as id
+ """
+ portal_sync = self.getSynchronizationTool()
+ subscription = portal_sync.getSubscription(sub_id)
+ publication = portal_sync.getPublication(self.pub_id)
+ gid_pub = publication.getGidFromObject(object_pub)
+ gid_sub = publication.getGidFromObject(object_sub)
+ self.assertEqual(gid_pub, gid_sub)
+ conduit = ERP5DocumentConduit()
+ xml_pub = conduit.getXMLFromObjectWithGid(object=object_pub, gid=gid_pub,\
+ xml_mapping=publication.getXMLMapping())
+ #if One Way From Server there is not xml_mapping for subscription
+ xml_sub = conduit.getXMLFromObjectWithGid(object=object_sub, gid=gid_sub,\
+ xml_mapping=subscription.getXMLMapping(force))
+ erp5diff = ERP5Diff()
+ erp5diff.compare(xml_pub, xml_sub)
+ result = erp5diff.outputString()
+ result = etree.XML(result)
+ if len(result) != 0 :
+ for update in result:
+ #XXX edit workflow is not replaced, so discard workflow checking
+ if update.get('select').find("workflow") != -1 :
+ continue
+ else :
+ self.fail('diff between pub:\n%s \n => \n%s' %\
+ (xml_pub, etree.tostring(result, pretty_print=True)))
+
+
+class TestERP5DocumentSyncML(TestERP5DocumentSyncMLMixin, ERP5TypeTestCase):
+
+ run_all_test = True
+ def getTitle(self):
+ """
+ """
+ return "ERP5 SyncML"
+
+
+ def setupPublicationAndSubscriptionIdGenerator(self, quiet=0, run=run_all_test):
+ portal_sync = self.getSynchronizationTool()
+ sub1 = portal_sync.getSubscription(self.sub_id1)
+ pub = portal_sync.getPublication(self.pub_id)
+ pub.setSynchronizationIdGenerator('generateNewId')
+ sub1.setSynchronizationIdGenerator('generateNewId')
+
+ def checkSynchronizationStateIsConflict(self, quiet=0, run=1,
+ portal_type='Text'):
+ portal_sync = self.getSynchronizationTool()
+ document_server = self.getDocumentServer()
+ for document in document_server.objectValues():
+ if document.getId()==self.id1:
+ state_list = portal_sync.getSynchronizationState(document)
+ for state in state_list:
+ self.assertEqual(state[1], state[0].CONFLICT)
+ document_client1 = self.getDocumentClient1()
+ for document in document_client1.objectValues():
+ if document.getId()==self.id1:
+ state_list = portal_sync.getSynchronizationState(document)
+ for state in state_list:
+ self.assertEqual(state[1], state[0].CONFLICT)
+ # make sure sub object are also in a conflict mode
+ document = document_client1._getOb(self.id1)
+ state_list = portal_sync.getSynchronizationState(document)
+ for state in state_list:
+ self.assertEqual(state[1], state[0].CONFLICT)
+
+ def test_01_GetSynchronizationList(self, quiet=0, run=run_all_test):
+ # This test the getSynchronizationList, ie,
+ # We want to see if we retrieve both the subscription
+ # and the publication
+ if not run: return
+ if not quiet:
+ ZopeTestCase._print('\nTest getSynchronizationList ')
+ LOG('Testing... ', 0, 'test_01_GetSynchronizationList')
+ portal_sync = self.getSynchronizationTool()
+ synchronization_list = portal_sync.getSynchronizationList()
+ self.assertEqual(len(synchronization_list), self.nb_synchronization)
+
+ def test_02_FirstSynchronization(self, quiet=0, run=run_all_test):
+ # We will try to populate the folder document_client1
+ # with the data form document_server
+ if not run: return
+ if not quiet:
+ ZopeTestCase._print('\nTest First Synchronization ')
+ LOG('Testing... ', 0, 'test_02_FirstSynchronization')
+ nb_document = self.documentServer(quiet=1, run=1)
+ portal_sync = self.getSynchronizationTool()
+ for sub in portal_sync.getSubscriptionList():
+ self.assertEquals(sub.getSynchronizationType(), SyncCode.SLOW_SYNC)
+ document_server = self.getDocumentServer()
+ doc1_s = document_server._getOb(self.id1)
+ self.assertEqual(doc1_s.getSourceReference(), self.filename_text)
+ self.assertEquals(self.size_filename_text, doc1_s.get_size())
+ doc2_s = document_server._getOb(self.id2)
+ self.assertEqual(doc2_s.getSourceReference(), self.filename_pdf)
+ self.assertEquals(self.size_filename_pdf, doc2_s.get_size())
+ # Synchronize the first client
+ nb_message1 = self.synchronize(self.sub_id1)
+ for sub in portal_sync.getSubscriptionList():
+ if sub.getTitle() == self.sub_id1:
+ self.assertEquals(sub.getSynchronizationType(), SyncCode.TWO_WAY)
+ else:
+ self.assertEquals(sub.getSynchronizationType(), SyncCode.SLOW_SYNC)
+ self.assertEqual(nb_message1, self.nb_message_first_synchronization)
+ self.checkSynchronizationStateIsSynchronized()
+ self.checkFirstSynchronization(nb_document=nb_document)
+
+ def test_03_UpdateSimpleData(self, quiet=0, run=run_all_test):
+ if not run: return
+ if not quiet:
+ ZopeTestCase._print('\nTest Update Simple Data ')
+ LOG('Testing... ', 0, 'test_03_UpdateSimpleData')
+ # Add two objects
+ self.test_02_FirstSynchronization(quiet=1, run=1)
+ # First we do only modification on server
+ portal_sync = self.getSynchronizationTool()
+ document_server = self.getDocumentServer()
+ document_s = document_server._getOb(self.id1)
+ kw = {'reference':self.reference3, 'language':self.language3,
+ 'version':self.version3}
+ document_s.edit(**kw)
+ self.synchronize(self.sub_id1)
+ self.checkSynchronizationStateIsSynchronized()
+ document_client1 = self.getDocumentClient1()
+ document_c = document_client1._getOb(self.id1)
+ self.assertEqual(document_s.getReference(), self.reference3)
+ self.assertEqual(document_s.getLanguage(), self.language3)
+ self.assertEqual(document_s.getVersion(), self.version3)
+ self.assertEqual(document_c.getSourceReference(), self.filename_text)
+ self.assertEquals(self.size_filename_text, document_c.get_size())
+ self.assertXMLViewIsEqual(self.sub_id1, document_s, document_c)
+ # Then we do only modification on a client (the gid) of client => add a object
+ kw = {'reference':self.reference1,'version':self.version3}
+ document_c.edit(**kw)
+ file = makeFileUpload(self.filename_odt)
+ document_c.edit(file=file)
+ #document_c.convertToBaseFormat()
+ document_c.reindexObject()
+ transaction.commit()
+ self.tic()
+ self.synchronize(self.sub_id1)
+ self.checkSynchronizationStateIsSynchronized()
+ document_s = document_server._getOb(self.id1)
+ self.assertEqual(document_s.getReference(), self.reference1)
+ self.assertEqual(document_s.getLanguage(), self.language3)
+ self.assertEqual(document_s.getVersion(), self.version3)
+ self.assertEqual(document_c.getSourceReference(), self.filename_odt)
+ self.assertEquals(self.size_filename_odt, document_c.get_size())
+ self.assertXMLViewIsEqual(self.sub_id1, document_s, document_c)
+ # Then we do only modification the field (useless for the gid)
+ # on both the client and the server and of course, on the same object
+ kw = {'description':self.description2}
+ document_s.edit(**kw)
+ kw = {'short_title':self.short_title1}
+ document_c.edit(**kw)
+ # The gid is modify so the synchronization add a object
+ transaction.commit()
+ self.tic()
+ self.synchronize(self.sub_id1)
+ self.checkSynchronizationStateIsSynchronized()
+ document_s = document_server._getOb(self.id1)
+ self.assertEqual(document_s.getDescription(), self.description2)
+ self.assertEqual(document_s.getShortTitle(), self.short_title1)
+ self.assertXMLViewIsEqual(self.sub_id1, document_s, document_c)
+
+ def test_04_DeleteObject(self, quiet=0, run=run_all_test):
+ """
+ We will do a first synchronization, then delete an object on both
+ sides, and we will see if nothing is left on the server and also
+ on the two clients
+ """
+ if not run: return
+ self.test_02_FirstSynchronization(quiet=1, run=1)
+ if not quiet:
+ ZopeTestCase._print('\nTest Delete Object ')
+ LOG('Testing... ', 0, 'test_04_DeleteObject')
+ document_server = self.getDocumentServer()
+ document_server.manage_delObjects(self.id1)
+ document_client1 = self.getDocumentClient1()
+ document_client1.manage_delObjects(self.id2)
+ transaction.commit()
+ self.tic()
+ self.synchronize(self.sub_id1)
+ self.checkSynchronizationStateIsSynchronized()
+ portal_sync = self.getSynchronizationTool()
+ publication = portal_sync.getPublication(self.pub_id)
+ subscription1 = portal_sync.getSubscription(self.sub_id1)
+ self.assertEqual(len(publication.getObjectList()), 0)
+ self.assertEqual(len(subscription1.getObjectList()), 0)
+
+ def test_05_FirstMultiSynchronization(self, quiet=0, run=run_all_test):
+ #Add document on the server and first synchronization for client
+ if not run: return
+ if not quiet:
+ ZopeTestCase._print('\nTest First Multi Synchronization ')
+ LOG('Testing... ', 0, 'test_05_FirstMultiSynchronization')
+ nb_document = self.documentMultiServer(quiet=1, run=1)
+ portal_sync = self.getSynchronizationTool()
+ document_server = self.getDocumentServer()
+ document_client = self.getDocumentClient1()
+ nb_message1 = self.synchronize(self.sub_id1)
+ self.assertNotEqual(nb_message1, 6)
+ # It has transmitted some object
+ for sub in portal_sync.getSubscriptionList():
+ self.assertEquals(sub.getSynchronizationType(), SyncCode.TWO_WAY)
+ self.checkSynchronizationStateIsSynchronized()
+ self.checkFirstSynchronizationWithMultiDocument(nb_document=nb_document)
+
+ def test_06_UpdateMultiData(self, quiet=0, run=run_all_test):
+ # Add various data in server
+ # modification in client and server for synchronize
+ if not run: return
+ if not quiet:
+ ZopeTestCase._print('\nTest Update Multi Data ')
+ LOG('Testing... ', 0, 'test_06_UpdateMultiData')
+ self.test_05_FirstMultiSynchronization(quiet=1, run=1)
+ # Side server modification gid of a text document
+ document_server = self.getDocumentServer()
+ document_client1= self.getDocumentClient1()
+ id_text = str(self.ids[3])
+ doc_s = document_server._getOb(id_text)
+ kw = {'reference':self.reference3, 'language':self.language3,
+ 'version':self.version3, 'description':self.description3}
+ doc_s.edit(**kw)
+ # Side client modification gid of a odt document
+ id_odt = str(self.ids[self.id_max_text+1])
+ doc_c = document_client1._getOb(id_odt)
+ kw = {'reference':self.reference2, 'language':self.language2,
+ 'version':self.version2, 'description':self.description2}
+ doc_c.edit(**kw)
+ self.synchronize(self.sub_id1)
+ self.checkSynchronizationStateIsSynchronized()
+ # Check that the datas modified after synchronization
+ doc_s = document_server._getOb(id_text)
+ self.checkDocument(id=id_text, document=doc_s,\
+ filename=self.filename_text,\
+ size_filename=self.size_filename_text,\
+ reference=self.reference3,\
+ language=self.language3,\
+ version=self.version3,\
+ description = self.description3)
+ doc_c = document_client1._getOb(id_odt)
+ self.checkDocument(id=id_odt, document=doc_c,\
+ filename=self.filename_odt,\
+ size_filename=self.size_filename_odt,\
+ reference=self.reference2,\
+ language=self.language2,\
+ version=self.version2,\
+ description = self.description2)
+ # Others
+ doc_c = document_client1._getOb(str(self.ids[2]))
+ reference = 'Test-Text-%s' % str(self.ids[2])
+ self.checkDocument(id=str(self.ids[2]), document=doc_c,\
+ reference=reference, filename=self.filename_text,\
+ size_filename=self.size_filename_text)
+ # Check the XMLs
+ self.checkXMLsSynchronized()
+ # Replace description and filename
+ doc_s = document_server._getOb(id_text)
+ kw = {'description':self.description1}
+ doc_s.edit(**kw)
+ file = makeFileUpload(self.filename_odt)
+ doc_s.edit(file=file)
+ #doc_s.convertToBaseFormat()
+ # Side client modification gid of a odt document
+ id_odt = str(self.ids[self.id_max_text+1])
+ doc_c = document_client1._getOb(id_odt)
+ kw = {'description':self.description3}
+ doc_c.edit(**kw)
+ file = makeFileUpload(self.filename_text)
+ doc_c.edit(file=file)
+ #doc_c.convertToBaseFormat()
+ transaction.commit()
+ self.tic()
+ self.synchronize(self.sub_id1)
+ self.checkSynchronizationStateIsSynchronized()
+ # Check that the datas modified after synchronization
+ doc_s = document_server._getOb(id_text)
+ self.checkDocument(id=id_text, document=doc_s,\
+ filename=self.filename_odt,\
+ size_filename=self.size_filename_odt,\
+ reference=self.reference3,\
+ language=self.language3,\
+ version=self.version3,\
+ description=self.description1)
+ doc_c = document_client1._getOb(id_odt)
+ self.checkDocument(id=id_odt, document=doc_c,\
+ filename=self.filename_text,\
+ size_filename=self.size_filename_text,\
+ reference=self.reference2,\
+ language=self.language2,\
+ version=self.version2,\
+ description = self.description3)
+ doc_c = document_client1._getOb(str(self.ids[2]))
+ reference = 'Test-Text-%s' % str(self.ids[2])
+ self.checkDocument(id=str(self.ids[2]), document=doc_c,\
+ reference=reference, filename=self.filename_text,\
+ size_filename=self.size_filename_text)
+ # Check the XMLs
+ self.checkXMLsSynchronized()
+
+ def test_07_SynchronizeWithStrangeIdGenerator(self, quiet=0, run=run_all_test):
+ """
+ By default, the synchronization process use the id in order to
+ recognize objects (because by default, getGid==getId. Here, we will see
+ if it also works with a somewhat strange getGid
+ """
+ if not run: return
+ if not quiet:
+ ZopeTestCase._print('\nTest Synchronize With Strange Gid ')
+ LOG('Testing... ', 0, 'test_07_SynchronizeWithStrangeIdGenerator')
+ self.login()
+ self.setupPublicationAndSubscriptionIdGenerator(quiet=1, run=1)
+ nb_document = self.documentServer(quiet=1, run=1)
+ # This will test adding object
+ self.synchronize(self.sub_id1)
+ self.checkSynchronizationStateIsSynchronized()
+ portal_sync = self.getSynchronizationTool()
+ subscription1 = portal_sync.getSubscription(self.sub_id1)
+ self.assertEqual(len(subscription1.getObjectList()), nb_document)
+ publication = portal_sync.getPublication(self.pub_id)
+ self.assertEqual(len(publication.getObjectList()), nb_document)
+ gid = self.reference1 + '-' + self.version1 + '-' + self.language1 # ie the title ''
+ gid = b16encode(gid)
+ document_c1 = subscription1.getObjectFromGid(gid)
+ id_c1 = document_c1.getId()
+ self.assertTrue(id_c1 in ('1', '2')) # id given by the default generateNewId
+ document_s = publication.getSubscriber(self.subscription_url['two_way']).getObjectFromGid(gid)
+ id_s = document_s.getId()
+ self.assertEqual(id_s, self.id1)
+ # This will test updating object
+ document_s.setDescription(self.description3)
+ self.synchronize(self.sub_id1)
+ self.checkSynchronizationStateIsSynchronized()
+ self.assertEqual(document_s.getDescription(), self.description3)
+ self.assertEqual(document_c1.getDescription(), self.description3)
+ self.assertXMLViewIsEqual(self.sub_id1, document_s, document_c1)
+ # This will test deleting object
+ document_server = self.getDocumentServer()
+ document_client1 = self.getDocumentClient1()
+ document_server.manage_delObjects(self.id2)
+ transaction.commit()
+ self.tic()
+ self.synchronize(self.sub_id1)
+ self.checkSynchronizationStateIsSynchronized()
+ self.assertEqual(len(subscription1.getObjectList()), (nb_document-1))
+ self.assertEqual(len(publication.getObjectList()), (nb_document-1))
+ document_s = publication.getSubscriber(self.subscription_url['two_way']).getObjectFromGid(gid)
+ id_s = document_s.getId()
+ self.assertEqual(id_s, self.id1)
+ document_c1 = subscription1.getObjectFromGid(gid)
+ id_c1 = document_c1.getId()
+ self.assertTrue(id_c1 in ('1', '2')) # id given by the default generateNewId
+ self.assertEqual(document_s.getDescription(), self.description3)
+ self.assertXMLViewIsEqual(self.sub_id1, document_s, document_c1)
+
+ def test_08_MultiNodeConflict(self, quiet=0, run=run_all_test):
+ """
+ We will create conflicts with 3 differents nodes, and we will
+ solve it by taking one full version of documents.
+ """
+ #not conflict because is gid
+ if not run: return
+ self.test_02_FirstSynchronization(quiet=1, run=1)
+ if not quiet:
+ ZopeTestCase._print('\nTest Multi Node Conflict ')
+ LOG('Testing... ', 0, 'test_08_MultiNodeConflict')
+ portal_sync = self.getSynchronizationTool()
+ document_server = self.getDocumentServer()
+ document_s = document_server._getOb(self.id1)
+ kw = {'description':self.description2, 'short_title':self.short_title2}
+ document_s.edit(**kw)
+ file = makeFileUpload(self.filename_ppt)
+ # XXX error with filename_pdf , may be is a PDF?
+ document_s.edit(file=file)
+ transaction.commit()
+ self.tic()
+ document_client1 = self.getDocumentClient1()
+ document_c1 = document_client1._getOb(self.id1)
+ kw = {'description':self.description3, 'short_title':self.short_title3}
+ document_c1.edit(**kw)
+ file = makeFileUpload(self.filename_odt)
+ document_c1.edit(file=file)
+ #document_c1.convertToBaseFormat()
+ transaction.commit()
+ self.tic()
+ self.synchronize(self.sub_id1)
+ conflict_list = portal_sync.getConflictList()
+ self.assertEqual(len(conflict_list), 7)
+ # check if we have the state conflict on all clients
+ self.checkSynchronizationStateIsConflict()
+ # we will take :
+ # description et file on document_server
+ # short_title on document_client1
+ for conflict in conflict_list :
+ subscriber = conflict.getSubscriber()
+ property = conflict.getPropertyId()
+ resolve = 0
+ if property == 'description':
+ if subscriber.getSubscriptionUrl()==self.publication_url:
+ resolve = 1
+ conflict.applySubscriberValue()
+ if property == 'short_title':
+ if subscriber.getSubscriptionUrl()==self.subscription_url['two_way']:
+ resolve = 1
+ conflict.applySubscriberValue()
+ if not resolve:
+ conflict.applyPublisherValue()
+ self.synchronize(self.sub_id1)
+ self.checkSynchronizationStateIsSynchronized()
+ self.assertEqual(document_c1.getDescription(), self.description2)
+ self.assertEqual(document_c1.getShortTitle(), self.short_title3)
+ self.assertEqual(document_c1.getSourceReference(), self.filename_ppt)
+ self.assertEquals(self.size_filename_ppt, document_c1.get_size())
+ document_s = document_server._getOb(self.id1)
+ document_c = document_client1._getOb(self.id1)
+ self.assertXMLViewIsEqual(self.sub_id1, document_s, document_c1)
+ # the workflow has one more "workflow" in document_c1
+ #self.synchronize(self.sub_id1)
+ #self.assertXMLViewIsEqual(self.sub_id1, document_s, document_c1)
+
+ def test_09_SynchronizeWorkflowHistory(self, quiet=0, run=run_all_test):
+ """
+ We will do a synchronization, then we will edit two times
+ the object on the server, then two times the object on the
+ client, and see if the global history as 4 more actions.
+ """
+ if not run: return
+ self.test_02_FirstSynchronization(quiet=1, run=1)
+ if not quiet:
+ ZopeTestCase._print('\nTest Synchronize WorkflowHistory ')
+ LOG('Testing... ', 0, 'test_09_SynchronizeWorkflowHistory')
+ document_server = self.getDocumentServer()
+ document_s = document_server._getOb(self.id1)
+ document_client1 = self.getDocumentClient1()
+ document_c = document_client1._getOb(self.id1)
+ kw1 = {'short_title':self.short_title1}
+ kw2 = {'short_title':self.short_title2}
+ len_wf = len(document_s.workflow_history[self.workflow_id])
+ document_s.edit(**kw2)
+ document_c.edit(**kw2)
+ document_s.edit(**kw1)
+ document_c.edit(**kw1)
+ self.synchronize(self.sub_id1)
+ self.checkSynchronizationStateIsSynchronized()
+ self.assertXMLViewIsEqual(self.sub_id1, document_s, document_c)
+ self.assertEqual(len(document_s.workflow_history[self.workflow_id]), len_wf)
+ self.assertEqual(len(document_c.workflow_history[self.workflow_id]), len_wf)
+
+ def test_10_BrokenMessage(self, quiet=0, run=run_all_test):
+ """
+ With http synchronization, when a message is not well
+ received, then we send message again, we want to
+ be sure that is such case we don't do stupid things
+
+ If we want to make this test more intersting, it is
+ better to split messages
+ """
+ if not run: return
+ if not quiet:
+ ZopeTestCase._print('\nTest Broken Message ')
+ LOG('Testing... ', 0, 'test_10_BrokenMessage')
+ previous_max_lines = SyncCode.MAX_LINES
+ SyncCode.MAX_LINES = 10
+ nb_document = self.documentServer(quiet=1, run =1)
+ # Synchronize the first client
+ nb_message1 = self.synchronizeWithBrokenMessage(self.sub_id1)
+ #self.failUnless(nb_message1==self.nb_message_first_synchronization)
+ portal_sync = self.getSynchronizationTool()
+ subscription1 = portal_sync.getSubscription(self.sub_id1)
+ self.assertEqual(len(subscription1.getObjectList()), nb_document)
+ document_server = self.getDocumentServer() # We also check we don't
+ # modify initial ob
+ document_s = document_server._getOb(self.id1)
+ document_client1 = self.getDocumentClient1()
+ document_c = document_client1._getOb(self.id1)
+ self.assertEqual(document_s.getId(), self.id1)
+ self.assertEqual(document_s.getReference(), self.reference1)
+ self.assertEqual(document_s.getLanguage(), self.language1)
+ self.assertEqual(document_s.getSourceReference(), self.filename_text)
+ self.assertEquals(self.size_filename_text, document_c.get_size())
+ self.assertXMLViewIsEqual(self.sub_id1, document_s, document_c)
+ SyncCode.MAX_LINES = previous_max_lines
+
+ def test_11_AddOneWaySubscription(self, quiet=0, run=run_all_test):
+ if not run: return
+ if not quiet:
+ ZopeTestCase._print('\nTest Add One Way Subscription ')
+ LOG('Testing... ', 0, 'test_11_AddOneWaySubscription')
+ portal_id = self.getPortalId()
+ portal_sync = self.getSynchronizationTool()
+ portal_sync.manage_addSubscription(title=self.sub_id_from_server,
+ publication_url=self.publication_url,
+ subscription_url=self.subscription_url['from_server'],
+ destination_path='/%s/document_client_from_server' % portal_id,
+ source_uri='Document',
+ target_uri='Document',
+ query='objectValues',
+ xml_mapping=self.xml_mapping,
+ conduit='ERP5DocumentConduit',
+ gpg_key='',
+ activity_enabled=True,
+ alert_code = SyncCode.ONE_WAY_FROM_SERVER,
+ login = 'daniele',
+ password = 'myPassword')
+ sub = portal_sync.getSubscription(self.sub_id_from_server)
+ self.assertTrue(sub.isOneWayFromServer())
+ self.failUnless(sub is not None)
+
+ def test_12_OneWaySync(self, quiet=0, run=run_all_test):
+ """
+ We will test if we can synchronize only from to server to the client.
+ We want to make sure in this case that all modifications on the client
+ will not be taken into account.
+ """
+ if not run: return
+ if not quiet:
+ ZopeTestCase._print('\nTest One Way Sync ')
+ LOG('Testing... ', 0, 'test_12_OneWaySync')
+ self.test_11_AddOneWaySubscription(quiet=1, run=1)
+ nb_document = self.documentServer(quiet=1, run=1, OneWay=True)
+ portal_sync = self.getSynchronizationTool()
+ sub_from_server = portal_sync.getSubscription(self.sub_id_from_server)
+ self.assertEquals(sub_from_server.getSynchronizationType(), SyncCode.SLOW_SYNC)
+ # First do the sync from the server to the client
+ nb_message1 = self.synchronize(self.sub_id_from_server)
+ sub_from_server = portal_sync.getSubscription(self.sub_id_from_server)
+ self.assertEquals(sub_from_server.getSynchronizationType(), SyncCode.ONE_WAY_FROM_SERVER)
+ self.assertEquals(nb_message1, self.nb_message_first_synchronization)
+ self.assertEquals(len(sub_from_server.getObjectList()), nb_document)
+ document_server = self.getDocumentServer() # We also check we don't
+ # modify initial ob
+ document_s = document_server._getOb(self.id1)
+ document_client1 = self.getDocumentClientFromServer()
+ document_c = document_client1._getOb(self.id1)
+ self.assertEqual(document_s.getId(), self.id1)
+ self.assertEqual(document_s.getReference(), self.reference1)
+ self.assertEqual(document_s.getLanguage(), self.language1)
+ self.checkSynchronizationStateIsSynchronized()
+ self.assertXMLViewIsEqual(self.sub_id_from_server, document_s, document_c, force=1)
+ # Then we change things on both sides and we look if there
+ # is synchronization from only one way
+ file = makeFileUpload(self.filename_odt)
+ document_c.edit(file=file)
+ #document_c.convertToBaseFormat()
+ kw = {'short_title' : self.short_title2}
+ document_s.edit(**kw)
+ transaction.commit()
+ self.tic()
+ self.assertEqual(document_s.getSourceReference(), self.filename_text)
+ self.assertEquals(self.size_filename_text, document_s.get_size())
+ nb_message1 = self.synchronize(self.sub_id_from_server)
+ #In One_From_Server Sync not modify the first_name in client because any
+ #datas client sent
+ self.assertEqual(document_c.getSourceReference(), self.filename_odt)
+ self.assertEquals(self.size_filename_odt, document_c.get_size())
+ self.assertEquals(document_c.getShortTitle(), self.short_title2)
+ self.assertEqual(document_s.getSourceReference(), self.filename_text)
+ self.assertEquals(self.size_filename_text, document_s.get_size())
+ self.assertEquals(document_s.getShortTitle(), self.short_title2)
+
+ #reset for refresh sync
+ #after synchronize, the client object retrieve value of server
+ self.resetSignaturePublicationAndSubscription()
+ nb_message1 = self.synchronize(self.sub_id_from_server)
+ self.assertEqual(document_c.getSourceReference(), self.filename_text)
+ self.assertEquals(self.size_filename_text, document_c.get_size())
+ self.assertEquals(document_c.getShortTitle(), self.short_title2)
+ self.checkSynchronizationStateIsSynchronized()
+ document_s = document_server._getOb(self.id1)
+ document_c = document_client1._getOb(self.id1)
+ # FIXME we have to manage workflow
+ #self.assertXMLViewIsEqual(self.sub_id1, document_s, document_c, force=1)
+
+
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(TestERP5DocumentSyncML))
+ return suite
More information about the Erp5-report
mailing list