[Erp5-report] r19125 - /erp5/trunk/products/ERP5/tests/testNotificationTool.py

nobody at svn.erp5.org nobody at svn.erp5.org
Thu Feb 7 14:44:16 CET 2008


Author: romain
Date: Thu Feb  7 14:44:15 2008
New Revision: 19125

URL: http://svn.erp5.org?rev=19125&view=rev
Log:
Check current behaviour of NotificationTool.

Added:
    erp5/trunk/products/ERP5/tests/testNotificationTool.py

Added: erp5/trunk/products/ERP5/tests/testNotificationTool.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5/tests/testNotificationTool.py?rev=19125&view=auto
==============================================================================
--- erp5/trunk/products/ERP5/tests/testNotificationTool.py (added)
+++ erp5/trunk/products/ERP5/tests/testNotificationTool.py Thu Feb  7 14:44:15 2008
@@ -1,0 +1,442 @@
+##############################################################################
+#
+# Copyright (c) 2008 Nexedi SA and Contributors. All Rights Reserved.
+#          Romain Courteaud <romain at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsability of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# garantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+##############################################################################
+
+import unittest
+
+from Testing import ZopeTestCase
+from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
+from AccessControl.SecurityManagement import newSecurityManager
+from zLOG import LOG
+from Products.ERP5Type.tests.Sequence import SequenceList
+from Products.ERP5Type.tests.utils import DummyMailHost
+from DateTime import DateTime
+import email
+from email.Header import decode_header, make_header
+from email.Utils import parseaddr
+
+# Copied from ERP5Type/patches/CMFMailIn.py
+def decode_email(file):
+  # Prepare result
+  theMail = {
+    'attachment_list': [],
+    'body': '',
+    # Place all the email header in the headers dictionary in theMail
+    'headers': {}
+  }
+  # Get Message
+  msg = email.message_from_string(file)
+  # Back up original file
+  theMail['__original__'] = file
+  # Recode headers to UTF-8 if needed
+  for key, value in msg.items():
+    decoded_value_list = decode_header(value)
+    unicode_value = make_header(decoded_value_list)
+    new_value = unicode_value.__unicode__().encode('utf-8')
+    theMail['headers'][key.lower()] = new_value
+  # Filter mail addresses
+  for header in ('resent-to', 'resent-from', 'resent-cc', 'resent-sender', 
+                 'to', 'from', 'cc', 'sender', 'reply-to'):
+    header_field = theMail['headers'].get(header)
+    if header_field:
+        theMail['headers'][header] = parseaddr(header_field)[1]
+  # Get attachments
+  body_found = 0
+  for part in msg.walk():
+    content_type = part.get_content_type()
+    file_name = part.get_filename()
+    # multipart/* are just containers
+    # XXX Check if data is None ?
+    if content_type.startswith('multipart'):
+      continue
+    # message/rfc822 contains attached email message
+    # next 'part' will be the message itself
+    # so we ignore this one to avoid doubling
+    elif content_type == 'message/rfc822':
+      continue
+    elif content_type == "text/plain":
+      charset = part.get_content_charset()
+      payload = part.get_payload(decode=True)
+      #LOG('CMFMailIn -> ',0,'charset: %s, payload: %s' % (charset,payload))
+      if charset:
+        payload = unicode(payload, charset).encode('utf-8')
+      if body_found:
+        # Keep the content type
+        theMail['attachment_list'].append((file_name, 
+                                           content_type, payload))
+      else:
+        theMail['body'] = payload
+        body_found = 1
+    else:
+      payload = part.get_payload(decode=True)
+      # Keep the content type
+      theMail['attachment_list'].append((file_name, content_type, 
+                                         payload))
+  return theMail
+
+class TestNotificationTool(ERP5TypeTestCase):
+  """
+  Test notification tool
+  """
+  run_all_test = 1
+  quiet = 1
+
+  def getBusinessTemplateList(self):
+    return ('erp5_base', )
+
+  def getTitle(self):
+    return "Notification Tool"
+
+  def afterSetUp(self):
+    portal = self.getPortal()
+    if 'MailHost' in portal.objectIds():
+      portal.manage_delObjects(['MailHost'])
+    portal._setObject('MailHost', DummyMailHost('MailHost'))
+    portal.email_from_address = 'site at example.invalid'
+    self.portal.portal_caches.clearAllCache()
+    get_transaction().commit()
+    self.tic()
+
+  def beforeTearDown(self):
+    get_transaction().abort()
+    # clear modules if necessary
+    self.portal.person_module.manage_delObjects(
+            list(self.portal.person_module.objectIds()))
+    get_transaction().commit()
+    self.tic()
+
+  def stepTic(self,**kw):
+    self.tic()
+
+  def stepAddUserA(self, sequence=None, sequence_list=None, **kw):
+    """
+    Create a user
+    """
+    person = self.portal.person_module.newContent(portal_type="Person",
+                                    reference="userA",
+                                    password="passwordA",
+                                    default_email_text="userA at example.invalid")
+    assignment = person.newContent(portal_type='Assignment')
+    assignment.open()
+
+  def stepAddUserB(self, sequence=None, sequence_list=None, **kw):
+    """
+    Create a user
+    """
+    person = self.portal.person_module.newContent(portal_type="Person",
+                                    reference="userB",
+                                    password="passwordA",
+                                    default_email_text="userB at example.invalid")
+    assignment = person.newContent(portal_type='Assignment')
+    assignment.open()
+
+  def stepAddUserWithoutEmail(self, sequence=None, sequence_list=None, **kw):
+    """
+    Create a user
+    """
+    person = self.portal.person_module.newContent(portal_type="Person",
+                                    reference="userWithoutEmail",
+                                    password="passwordA")
+    assignment = person.newContent(portal_type='Assignment')
+    assignment.open()
+
+  def test_01_defaultBehaviour(self, quiet=quiet, run=run_all_test):
+    if not run: return
+    if not quiet:
+      message = 'Test default behaviour of sendMessage'
+      ZopeTestCase._print('\n%s ' % message)
+      LOG('Testing... ', 0, message)
+
+    self.assertRaises(
+      TypeError,
+      self.portal.portal_notifications.sendMessage,
+    )
+
+  def stepCheckNotificationWithoutSender(self, sequence=None, 
+                                         sequence_list=None, **kw):
+    """
+    Check that notification works without sender
+    """
+    self.portal.portal_notifications.sendMessage(
+        recipient='userA', subject='Subject', message='Message')
+    last_message = self.portal.MailHost._last_message
+    self.assertNotEquals((), last_message)
+    mfrom, mto, messageText = last_message
+    self.assertEquals('site at example.invalid', mfrom)
+    self.assertEquals(['userA at example.invalid'], mto)
+
+  def test_02_noSender(self, quiet=quiet, run=run_all_test):
+    if not run: return
+    if not quiet:
+      message = 'Test default sender value'
+      ZopeTestCase._print('\n%s ' % message)
+      LOG('Testing... ', 0, message)
+
+    sequence_list = SequenceList()
+    sequence_string = '\
+        AddUserA \
+        Tic \
+        CheckNotificationWithoutSender \
+        '
+    sequence_list.addSequenceString(sequence_string)
+    sequence_list.play(self, quiet=quiet)
+
+  def stepCheckNotificationFailsWithoutSubject(self, sequence=None, 
+                                               sequence_list=None, **kw):
+    """
+    Check that notification fails when no subject is given
+    """
+    self.assertRaises(
+      TypeError,
+      self.portal.portal_notifications.sendMessage,
+        recipient='userA', message='Message'
+    )
+
+  def test_03_noSubject(self, quiet=quiet, run=run_all_test):
+    if not run: return
+    if not quiet:
+      message = 'Test default sender value'
+      ZopeTestCase._print('\n%s ' % message)
+      LOG('Testing... ', 0, message)
+
+    sequence_list = SequenceList()
+    sequence_string = '\
+        AddUserA \
+        Tic \
+        CheckNotificationFailsWithoutSubject \
+        '
+    sequence_list.addSequenceString(sequence_string)
+    sequence_list.play(self, quiet=quiet)
+
+  def test_04_noRecipient(self, quiet=quiet, run=run_all_test):
+    if not run: return
+    if not quiet:
+      message = 'Test default sender value'
+      ZopeTestCase._print('\n%s ' % message)
+      LOG('Testing... ', 0, message)
+
+    self.portal.portal_notifications.sendMessage(
+        subject='Subject', message='Message')
+    last_message = self.portal.MailHost._last_message
+    self.assertNotEquals((), last_message)
+    mfrom, mto, messageText = last_message
+    self.assertEquals('site at example.invalid', mfrom)
+    self.assertEquals(['site at example.invalid'], mto)
+
+  def stepCheckNotificationWithoutMessage(self, sequence=None, 
+                                          sequence_list=None, **kw):
+    """
+    Check that notification fails when no subject is given
+    """
+    self.portal.portal_notifications.sendMessage(
+        recipient='userA', subject='Subject', )
+    last_message = self.portal.MailHost._last_message
+    self.assertNotEquals((), last_message)
+    mfrom, mto, messageText = last_message
+    self.assertEquals('site at example.invalid', mfrom)
+    self.assertEquals(['userA at example.invalid'], mto)
+
+  def test_05_noMessage(self, quiet=quiet, run=run_all_test):
+    if not run: return
+    if not quiet:
+      message = 'Test default sender value'
+      ZopeTestCase._print('\n%s ' % message)
+      LOG('Testing... ', 0, message)
+
+    sequence_list = SequenceList()
+    sequence_string = '\
+        AddUserA \
+        Tic \
+        CheckNotificationWithoutMessage \
+        '
+    sequence_list.addSequenceString(sequence_string)
+    sequence_list.play(self, quiet=quiet)
+
+  def stepCheckSimpleNotification(self, sequence=None, 
+                                  sequence_list=None, **kw):
+    """
+    Check that notification fails when no subject is given
+    """
+    self.portal.portal_notifications.sendMessage(
+        recipient='userA', subject='Subject', message='Message')
+    last_message = self.portal.MailHost._last_message
+    self.assertNotEquals((), last_message)
+    mfrom, mto, messageText = last_message
+    self.assertEquals('site at example.invalid', mfrom)
+    self.assertEquals(['userA at example.invalid'], mto)
+    # Check Message
+    mail_dict = decode_email(messageText)
+    self.assertEquals(mail_dict['headers']['subject'], 'Subject')
+    self.assertEquals(mail_dict['body'], 'Message')
+    self.assertSameSet([], mail_dict['attachment_list'])
+
+  def test_06_simpleMessage(self, quiet=quiet, run=run_all_test):
+    if not run: return
+    if not quiet:
+      message = 'Test default sender value'
+      ZopeTestCase._print('\n%s ' % message)
+      LOG('Testing... ', 0, message)
+
+    sequence_list = SequenceList()
+    sequence_string = '\
+        AddUserA \
+        Tic \
+        CheckSimpleNotification \
+        '
+    sequence_list.addSequenceString(sequence_string)
+    sequence_list.play(self, quiet=quiet)
+
+  def stepCheckNotificationWithAttachment(self, sequence=None, 
+                                          sequence_list=None, **kw):
+    """
+    Check that notification fails when no subject is given
+    """
+    self.portal.portal_notifications.sendMessage(
+        recipient='userA', subject='Subject', message='Message',
+        attachment_list=[
+          {
+            'name': 'Attachment 1',
+            'content': 'Text 1',
+            'mime_type': 'text/plain',
+          },
+          {
+            'name': 'Attachment 2',
+            'content': 'Text 2',
+            'mime_type': 'application/octet-stream',
+          },
+        ])
+    last_message = self.portal.MailHost._last_message
+
+    self.assertNotEquals((), last_message)
+    mfrom, mto, messageText = last_message
+    self.assertEquals('site at example.invalid', mfrom)
+    self.assertEquals(['userA at example.invalid'], mto)
+
+    # Check Message
+    mail_dict = decode_email(messageText)
+    self.assertEquals(mail_dict['headers']['subject'], 'Subject')
+    self.assertEquals(mail_dict['body'], 'Message')
+    self.assertSameSet([('Attachment 1', 'text/plain', 'Text 1'),
+                        ('Attachment 2', 'application/octet-stream', 'Text 2')], 
+                       mail_dict['attachment_list'])
+
+  def test_07_AttachmentMessage(self, quiet=quiet, run=run_all_test):
+    if not run: return
+    if not quiet:
+      message = 'Test default sender value'
+      ZopeTestCase._print('\n%s ' % message)
+      LOG('Testing... ', 0, message)
+
+    sequence_list = SequenceList()
+    sequence_string = '\
+        AddUserA \
+        Tic \
+        CheckNotificationWithAttachment \
+        '
+    sequence_list.addSequenceString(sequence_string)
+    sequence_list.play(self, quiet=quiet)
+
+  def stepCheckMultiRecipientNotification(self, sequence=None, 
+                                          sequence_list=None, **kw):
+    """
+    Check that notification fails when no subject is given
+    """
+    self.portal.portal_notifications.sendMessage(
+        recipient=['userA', 'userB'], subject='Subject', message='Message')
+    last_message = self.portal.MailHost._last_message
+
+    self.assertNotEquals((), last_message)
+    mfrom, mto, messageText = last_message
+    self.assertEquals('site at example.invalid', mfrom)
+    self.assertEquals(['userB at example.invalid'], mto)
+
+    previous_message = self.portal.MailHost._previous_message
+    self.assertNotEquals((), previous_message)
+    mfrom, mto, messageText = previous_message
+    self.assertEquals('site at example.invalid', mfrom)
+    self.assertEquals(['userA at example.invalid'], mto)
+
+  def test_08_MultiRecipient(self, quiet=quiet, run=run_all_test):
+    if not run: return
+    if not quiet:
+      message = 'Test default sender value'
+      ZopeTestCase._print('\n%s ' % message)
+      LOG('Testing... ', 0, message)
+
+    sequence_list = SequenceList()
+    sequence_string = '\
+        AddUserA \
+        AddUserB \
+        Tic \
+        CheckMultiRecipientNotification \
+        '
+    sequence_list.addSequenceString(sequence_string)
+    sequence_list.play(self, quiet=quiet)
+
+  def stepCheckPersonWithoutEmail(self, sequence=None, 
+                                  sequence_list=None, **kw):
+    """
+    Check that notification fails when the destination hasn't a email adress
+    """
+    self.assertRaises(
+      AttributeError,
+      self.portal.portal_notifications.sendMessage,
+      recipient='userWithoutEmail', subject='Subject', message='Message'
+    )
+
+  def test_08_PersonWithoutEmail(self, quiet=quiet, run=run_all_test):
+    if not run: return
+    if not quiet:
+      message = 'Test default sender value'
+      ZopeTestCase._print('\n%s ' % message)
+      LOG('Testing... ', 0, message)
+
+    sequence_list = SequenceList()
+    sequence_string = '\
+        AddUserWithoutEmail \
+        Tic \
+        CheckPersonWithoutEmail \
+        '
+    sequence_list.addSequenceString(sequence_string)
+    sequence_list.play(self, quiet=quiet)
+
+  def test_09_InvalideRecipient(self, quiet=quiet, run=run_all_test):
+    if not run: return
+    if not quiet:
+      message = 'Test default sender value'
+      ZopeTestCase._print('\n%s ' % message)
+      LOG('Testing... ', 0, message)
+    self.assertRaises(
+      IndexError,
+      self.portal.portal_notifications.sendMessage,
+      recipient='UnknowUser', subject='Subject', message='Message'
+    )
+
+def test_suite():
+  suite = unittest.TestSuite()
+  suite.addTest(unittest.makeSuite(TestNotificationTool))
+  return suite




More information about the Erp5-report mailing list