[Erp5-report] r31895 kazuhiko - /erp5/trunk/products/ERP5/tests/testERP5Simulation.py

nobody at svn.erp5.org nobody at svn.erp5.org
Fri Jan 22 10:07:01 CET 2010


Author: kazuhiko
Date: Fri Jan 22 10:06:59 2010
New Revision: 31895

URL: http://svn.erp5.org?rev=31895&view=rev
Log:
include invoice tests.

Modified:
    erp5/trunk/products/ERP5/tests/testERP5Simulation.py

Modified: erp5/trunk/products/ERP5/tests/testERP5Simulation.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5/tests/testERP5Simulation.py?rev=31895&r1=31894&r2=31895&view=diff
==============================================================================
--- erp5/trunk/products/ERP5/tests/testERP5Simulation.py [utf8] (original)
+++ erp5/trunk/products/ERP5/tests/testERP5Simulation.py [utf8] Fri Jan 22 10:06:59 2010
@@ -36,20 +36,411 @@
 from Products.CMFCore.utils import getToolByName
 from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
 from Products.ERP5Type.tests.Sequence import SequenceList
-from testPackingList import TestPackingList, TestPackingListMixin
-
-class TestERP5SimulationMixin(TestPackingListMixin):
+from Products.ERP5Type.UnrestrictedMethod import UnrestrictedMethod
+from testPackingList import TestPackingList
+from testInvoice import TestSaleInvoice, TestInvoiceMixin
+
+class TestERP5SimulationMixin(TestInvoiceMixin):
   def getBusinessTemplateList(self):
-    return list(TestPackingListMixin.getBusinessTemplateList(self)) + \
-           ['erp5_simulation',]
+    return list(TestInvoiceMixin.getBusinessTemplateList(self)) + \
+           ['erp5_administration', 'erp5_simulation',]
 
   def afterSetUp(self, quiet=1, run=1):
-    TestPackingListMixin.afterSetUp(self, quiet, run)
+    TestInvoiceMixin.afterSetUp(self)
     portal_rules = self.portal.portal_rules
     for rule in portal_rules.objectValues(portal_type='Order Rule'):
       if rule.getValidationState() == 'validated':
         rule.invalidate()
     self.validateNewRules()
+
+  @UnrestrictedMethod
+  def createInvoiceTransactionRule(self, resource=None):
+    """Create a sale invoice transaction rule with only one cell for
+    product_line/apparel and default_region
+    The accounting rule cell will have the provided resource, but this his more
+    or less optional (as long as price currency is set correctly on order)
+    """
+    portal = self.portal
+    account_module = portal.account_module
+    for account_id, account_gap, account_type \
+               in self.account_definition_list:
+      if not account_id in account_module.objectIds():
+        account = account_module.newContent(id=account_id)
+        account.setGap(account_gap)
+        account.setAccountType(account_type)
+        portal.portal_workflow.doActionFor(account, 'validate_action')
+
+    invoice_rule = portal.portal_rules.default_invoice_transaction_rule
+    if invoice_rule.getValidationState() == 'validated':
+      invoice_rule.invalidate()
+    invoice_rule.deleteContent(list(invoice_rule.objectIds()))
+    transaction.commit()
+    self.tic()
+    region_predicate = invoice_rule.newContent(portal_type = 'Predicate')
+    product_line_predicate = invoice_rule.newContent(portal_type = 'Predicate')
+    region_predicate.edit(
+      membership_criterion_base_category_list = ['destination_region'],
+      membership_criterion_category_list =
+                   ['destination_region/region/%s' % self.default_region ],
+      int_index = 1,
+      string_index = 'region'
+    )
+    product_line_predicate.edit(
+      membership_criterion_base_category_list = ['product_line'],
+      membership_criterion_category_list =
+                            ['product_line/apparel'],
+      int_index = 1,
+      string_index = 'product'
+    )
+    product_line_predicate.immediateReindexObject()
+    region_predicate.immediateReindexObject()
+
+    invoice_rule.updateMatrix()
+    cell_list = invoice_rule.getCellValueList(base_id='movement')
+    self.assertEquals(len(cell_list),1)
+    cell = cell_list[0]
+
+    for line_id, line_source_id, line_destination_id, line_ratio in \
+        self.transaction_line_definition_list:
+      line = cell.newContent(id=line_id,
+          portal_type='Accounting Transaction Line', quantity=line_ratio,
+          resource_value=resource,
+          source_value=account_module[line_source_id],
+          destination_value=account_module[line_destination_id])
+
+    # matching provider for source and destination
+    for category in ('source', 'destination',):
+      invoice_rule.newContent(
+        portal_type='Category Membership Divergence Tester',
+        title='%s divergence tester' % category,
+        tested_property=category,
+        divergence_provider=False,
+        matching_provider=True)
+    # matching provider for quantity (i.e. only used for expand)
+    invoice_rule.newContent(
+      portal_type='Net Converted Quantity Divergence Tester',
+      title='%s divergence tester' % category,
+      tested_property='quantity',
+      quantity=0,
+      divergence_provider=False,
+      matching_provider=True)
+    invoice_rule.validate()
+    transaction.commit()
+    self.tic()
+
+  def validateNewRules(self):
+    # create an Order Rule document.
+    portal_rules = self.portal.portal_rules
+    new_order_rule = filter(
+      lambda x:x.title == 'New Default Order Rule',
+      portal_rules.objectValues(portal_type='Order Rule'))[0]
+    if new_order_rule.getValidationState() != 'validated':
+      new_order_rule.validate()
+
+  def _acceptDecisionQuantity(self, document):
+    solver_tool = self.portal.portal_solvers
+    solver_process = solver_tool.newSolverProcess(document)
+    quantity_solver_decision = filter(
+      lambda x:x.getCausalityValue().getTestedProperty()=='quantity',
+      solver_process.contentValues())[0]
+    # use Quantity Accept Solver.
+    quantity_solver_decision.setSolverValue(self.portal.portal_types['Quantity Accept Solver'])
+    solver_process.buildTargetSolverList()
+    solver_process.solve()
+
+  def _acceptDivergenceOnInvoice(self, invoice, divergence_list):
+    print invoice, divergence_list
+    return self._acceptDecisionQuantity(invoice)
+
+  def stepAcceptDecisionQuantity(self,sequence=None, sequence_list=None):
+    """
+    Solve quantity divergence by using solver tool.
+    """
+    packing_list = sequence.get('packing_list')
+    self._acceptDecisionQuantity(packing_list)
+
+  def stepAcceptDecisionQuantityInvoice(self, sequence=None,
+                                        sequence_list=None):
+    """
+    Solve quantity divergence by using solver tool.
+    """
+    invoice = sequence.get('invoice')
+    self._acceptDecisionQuantity(invoice)
+
+  def stepAcceptDecisionResource(self,sequence=None, sequence_list=None):
+    """
+    Solve quantity divergence by using solver tool.
+    """
+    packing_list = sequence.get('packing_list')
+    solver_tool = self.portal.portal_solvers
+    solver_process = solver_tool.newSolverProcess(packing_list)
+    resource_solver_decision = filter(
+      lambda x:x.getCausalityValue().getTestedProperty()=='resource',
+      solver_process.contentValues())[0]
+    # use Resource Replacement Solver.
+    resource_solver_decision.setSolverValue(self.portal.portal_types['Resource Replacement Solver'])
+    solver_process.buildTargetSolverList()
+    solver_process.solve()
+
+  def stepSplitAndDeferPackingList(self, sequence=None, sequence_list=None):
+    """
+      Do the split and defer action
+    """
+    packing_list = sequence.get('packing_list')
+    solver_tool = self.portal.portal_solvers
+    solver_process = solver_tool.newSolverProcess(packing_list)
+    quantity_solver_decision = filter(
+      lambda x:x.getCausalityValue().getTestedProperty()=='quantity',
+      solver_process.contentValues())[0]
+    # use Quantity Split Solver.
+    quantity_solver_decision.setSolverValue(self.portal.portal_types['Quantity Split Solver'])
+    # configure for Quantity Split Solver.
+    kw = {'delivery_solver':'FIFO',
+          'start_date':self.datetime + 15,
+          'stop_date':self.datetime + 25}
+    quantity_solver_decision.updateConfiguration(**kw)
+    solver_process.buildTargetSolverList()
+    solver_process.solve()
+    # build split deliveries manually. XXX ad-hoc
+    previous_tag = None
+    for delivery_builder in packing_list.getBuilderList():
+      this_builder_tag = '%s_split_%s' % (packing_list.getPath(),
+                                          delivery_builder.getId())
+      after_tag = []
+      if previous_tag:
+        after_tag.append(previous_tag)
+      delivery_builder.activate(
+        after_method_id=('solve',
+                         'immediateReindexObject',
+                         'recursiveImmediateReindexObject',), # XXX too brutal.
+        after_tag=after_tag,
+        ).build(explanation_uid=packing_list.getCausalityValue().getUid())
+
+  def _adoptPrevisionQuantity(self, packing_list):
+    """
+    Solve quantity divergence by using solver tool.
+    """
+    solver_tool = self.portal.portal_solvers
+    solver_process = solver_tool.newSolverProcess(packing_list)
+    quantity_solver_decision = filter(
+      lambda x:x.getCausalityValue().getTestedProperty()=='quantity',
+      solver_process.contentValues())[0]
+    # use Quantity Adoption Solver.
+    quantity_solver_decision.setSolverValue(self.portal.portal_types['Quantity Adoption Solver'])
+    solver_process.buildTargetSolverList()
+    solver_process.solve()
+
+  def _adoptDivergenceOnInvoice(self, invoice, divergence_list):
+    print invoice, divergence_list
+    return self._adoptPrevisionQuantity(invoice)
+
+  def _adoptDivergenceOnPackingList(self, packing_list, divergence_list):
+    print packing_list, divergence_list
+    return self._adoptPrevisionQuantity(packing_list)
+
+  def stepAdoptPrevisionQuantity(self,sequence=None, sequence_list=None):
+    """
+    Solve quantity divergence by using solver tool.
+    """
+    packing_list = sequence.get('packing_list')
+    self._adoptPrevisionQuantity(packing_list)
+
+  def stepNewPackingListAdoptPrevisionQuantity(self, sequence=None,
+                                               sequence_list=None):
+    """
+    Solve quantity divergence by using solver tool.
+    """
+    packing_list = sequence.get('new_packing_list')
+    self._adoptPrevisionQuantity(packing_list)
+
+  def stepAdoptPrevisionResource(self,sequence=None, sequence_list=None):
+    """
+    Solve resource divergence by using solver tool.
+    """
+    packing_list = sequence.get('packing_list')
+    solver_tool = self.portal.portal_solvers
+    solver_process = solver_tool.newSolverProcess(packing_list)
+    resource_solver_decision = filter(
+      lambda x:x.getCausalityValue().getTestedProperty()=='resource',
+      solver_process.contentValues())[0]
+    # use Resource Adopt Solver.
+    resource_solver_decision.setSolverValue(self.portal.portal_types['Resource Adoption Solver'])
+    solver_process.buildTargetSolverList()
+    solver_process.solve()
+
+  def stepCheckPackingListLineWithSameResource(self,sequence=None, sequence_list=None):
+    """
+      Look if the packing list has new previsions
+    """
+    old_packing_list_line = sequence.get('packing_list_line')
+    packing_list_line = old_packing_list_line.aq_parent[str(int(old_packing_list_line.getId())-1)]
+    resource = sequence.get('resource')
+    for line in sequence.get('packing_list').getMovementList():
+      self.assertEquals(line.getResourceValue(), resource)
+      self.assertEquals(line.getQuantity(), self.default_quantity)
+      self.assertEquals(line.getCausalityList(),
+                        [x.getOrder() for x in \
+                         line.getDeliveryRelatedValueList()])
+
+  def stepUnifyDestinationWithDecision(self,sequence=None, sequence_list=None):
+    """
+      Check if simulation movement are disconnected
+    """
+    packing_list = sequence.get('packing_list')
+    solver_tool = self.portal.portal_solvers
+    solver_process = solver_tool.newSolverProcess(packing_list)
+    for destination_solver_decision in filter(
+      lambda x:x.getCausalityValue().getTestedProperty()=='destination',
+      solver_process.contentValues()):
+      # use Destination Replacement Solver.
+      destination_solver_decision.setSolverValue(self.portal.portal_types['Destination Replacement Solver'])
+    solver_process.buildTargetSolverList()
+    solver_process.solve()
+
+  def _unifyStartDateWithDecision(self, document):
+    solver_tool = self.portal.portal_solvers
+    solver_process = solver_tool.newSolverProcess(document)
+    for start_date_solver_decision in filter(
+      lambda x:x.getCausalityValue().getTestedProperty()=='start_date',
+      solver_process.contentValues()):
+      # use StartDate Replacement Solver.
+      start_date_solver_decision.setSolverValue(self.portal.portal_types['Start Date Replacement Solver'])
+      # configure for Quantity Split Solver.
+      kw = {'value':document.getStartDate()}
+      start_date_solver_decision.updateConfiguration(**kw)
+    solver_process.buildTargetSolverList()
+    solver_process.solve()
+
+  def stepUnifyStartDateWithDecision(self,sequence=None, sequence_list=None):
+    packing_list = sequence.get('packing_list')
+    self._unifyStartDateWithDecision(packing_list)
+
+  def stepUnifyStartDateWithDecisionInvoice(self,sequence=None, sequence_list=None):
+    invoice = sequence.get('invoice')
+    self._unifyStartDateWithDecision(invoice)
+
+  def stepUnifyStartDateWithPrevision(self,sequence=None, sequence_list=None):
+    """
+      Check if simulation movement are disconnected
+    """
+    packing_list = sequence.get('packing_list')
+    applied_rule = sequence.get('applied_rule')
+    simulation_line_list = applied_rule.objectValues()
+    start_date = simulation_line_list[-1].getStartDate()
+    solver_tool = self.portal.portal_solvers
+    solver_process = solver_tool.newSolverProcess(packing_list)
+    for start_date_solver_decision in filter(
+      lambda x:x.getCausalityValue().getTestedProperty()=='start_date',
+      solver_process.contentValues()):
+      # use StartDate Replacement Solver.
+      start_date_solver_decision.setSolverValue(self.portal.portal_types['Start Date Replacement Solver'])
+      # configure for Quantity Split Solver.
+      kw = {'value':start_date}
+      start_date_solver_decision.updateConfiguration(**kw)
+    solver_process.buildTargetSolverList()
+    solver_process.solve()
+
+  def checkOrderRuleSimulation(self, rule_reference, sequence=None, sequence_list=None):
+    """
+      Test if simulation is matching order, be sure that rule_reference is used
+      to expand simulation for order
+    """
+    order = sequence.get('order')
+    related_applied_rule_list = order.getCausalityRelatedValueList( \
+                                   portal_type=self.applied_rule_portal_type)
+    no_applied_rule_state = ('draft', 'auto_planned')
+    order_state = order.getSimulationState()
+
+    if order_state in no_applied_rule_state:
+      self.assertEquals(0, len(related_applied_rule_list))
+    else:
+      LOG('stepCheckOrderRuleSimulation', 0, 'related_applied_rule_list: %s' %
+                   str([x.getObject() for x in related_applied_rule_list]))
+      self.assertEquals(1, len(related_applied_rule_list))
+      applied_rule = related_applied_rule_list[0].getObject()
+      sequence.edit(applied_rule=applied_rule)
+      self.failUnless(applied_rule is not None)
+      self.failUnless(order_state, \
+                      applied_rule.getLastExpandSimulationState())
+
+      # Test if applied rule has a specialise value with passed rule_reference
+      portal_rules = getToolByName(order, 'portal_rules')
+      self.assertEquals(rule_reference,
+                        applied_rule.getSpecialiseReference())
+
+      simulation_movement_list = applied_rule.objectValues()
+      sequence.edit(simulation_movement_list=simulation_movement_list)
+
+      # Count the number of movement in order
+      movement_list = order.getMovementList()
+      # Check if number of unique movement is equal to number of
+      # simulation movement
+      unique_movement_list = dict(
+        [('%r,%r,%r' % (x.getResource(), x.getVariationCategoryList(),
+          x.getVariationPropertyDict())), x] for x in movement_list).values()
+      self.assertEquals(len(unique_movement_list),
+                        len(simulation_movement_list))
+      # Check if all movements are related to simulation movements
+      order_movement_list = sum([x.getOrderValueList() for x in \
+                                 simulation_movement_list], [])
+      self.failIfDifferentSet(movement_list, order_movement_list)
+
+      # Check each simulation movement
+      for simulation_movement in simulation_movement_list:
+        order_movement_list = simulation_movement.getOrderValueList()
+        # Test quantity
+        self.assertEquals(sum([x.getQuantity() for x in order_movement_list]),
+                          simulation_movement.getQuantity())
+        for order_movement in order_movement_list:
+          # Test price
+          self.assertEquals(order_movement.getPrice(), \
+                            simulation_movement.getPrice())
+          # Test resource
+          self.assertEquals(order_movement.getResource(), \
+                            simulation_movement.getResource())
+          # Test resource variation
+          self.assertEquals(order_movement.getVariationText(), \
+                            simulation_movement.getVariationText())
+          self.assertEquals(order_movement.getVariationCategoryList(), \
+                            simulation_movement.getVariationCategoryList())
+          # XXX Test acquisition
+          self.checkAcquisition(simulation_movement, order_movement)
+
+  def stepModifySimulationLineQuantityForMergedLine(self,sequence=None, sequence_list=None):
+    """
+      Check if simulation movement are disconnected
+    """
+    applied_rule = sequence.get('applied_rule')
+    simulation_line_list = applied_rule.objectValues()
+    self.assertEquals(len(simulation_line_list), 1)
+    for simulation_line in simulation_line_list:
+      simulation_line.edit(quantity=self.default_quantity-2)
+      simulation_line.getOrderValue().edit(quantity=self.default_quantity-2)
+
+  def stepCheckSimulationQuantityUpdatedForMergedLine(self,sequence=None, sequence_list=None):
+    """
+      Test if the quantity of the simulation movement was changed
+    """
+    applied_rule = sequence.get('applied_rule')
+    simulation_line_list = applied_rule.objectValues()
+    self.assertEquals(len(simulation_line_list), 1)
+    for simulation_line in simulation_line_list:
+      self.assertEquals(simulation_line.getQuantity() + \
+                        simulation_line.getDeliveryError(),
+                        self.default_quantity * 2)
+
+  def stepCheckPackingListLineWithDifferentResource(self,sequence=None, sequence_list=None):
+    """
+      Look if the packing list has new previsions
+    """
+    packing_list_line = sequence.get('packing_list_line')
+    new_resource = sequence.get('resource')
+    self.assertEquals(packing_list_line.getQuantity(), self.default_quantity*2)
+    self.assertEquals(packing_list_line.getResourceValue(), new_resource)
+    simulation_line_list = packing_list_line.getDeliveryRelatedValueList()
+    order_line_list = sum([x.getOrderList() for x in simulation_line_list], [])
+    self.assertEquals(sorted(packing_list_line.getCausalityList()),
+                      sorted(order_line_list))
 
 class TestERP5Simulation(TestERP5SimulationMixin, ERP5TypeTestCase):
   run_all_test = 1
@@ -205,298 +596,14 @@
     sequence_list.play(self, quiet=quiet)
 
 class TestERP5SimulationPackingList(TestERP5SimulationMixin, TestPackingList):
-  def validateNewRules(self):
-    # create an Order Rule document.
-    portal_rules = self.portal.portal_rules
-    new_order_rule = filter(
-      lambda x:x.title == 'New Default Order Rule',
-      portal_rules.objectValues(portal_type='Order Rule'))[0]
-    if new_order_rule.getValidationState() != 'validated':
-      new_order_rule.validate()
-
-  def stepAcceptDecisionQuantity(self,sequence=None, sequence_list=None, **kw):
-    """
-    Solve quantity divergence by using solver tool.
-    """
-    packing_list = sequence.get('packing_list')
-    solver_tool = self.portal.portal_solvers
-    solver_process = solver_tool.newSolverProcess(packing_list)
-    quantity_solver_decision = filter(
-      lambda x:x.getCausalityValue().getTestedProperty()=='quantity',
-      solver_process.contentValues())[0]
-    # use Quantity Accept Solver.
-    quantity_solver_decision.setSolverValue(self.portal.portal_types['Quantity Accept Solver'])
-    solver_process.buildTargetSolverList()
-    solver_process.solve()
-    # XXX-JPS We do not need the divergence message anymore.
-    # since the divergence message == the divergence tester itself
-    # with its title, description, tested property, etc.
-
-  def stepAcceptDecisionResource(self,sequence=None, sequence_list=None, **kw):
-    """
-    Solve quantity divergence by using solver tool.
-    """
-    packing_list = sequence.get('packing_list')
-    solver_tool = self.portal.portal_solvers
-    solver_process = solver_tool.newSolverProcess(packing_list)
-    resource_solver_decision = filter(
-      lambda x:x.getCausalityValue().getTestedProperty()=='resource',
-      solver_process.contentValues())[0]
-    # use Resource Replacement Solver.
-    resource_solver_decision.setSolverValue(self.portal.portal_types['Resource Replacement Solver'])
-    solver_process.buildTargetSolverList()
-    solver_process.solve()
-
-  def stepSplitAndDeferPackingList(self, sequence=None, sequence_list=None, **kw):
-    """
-      Do the split and defer action
-    """
-    packing_list = sequence.get('packing_list')
-    solver_tool = self.portal.portal_solvers
-    solver_process = solver_tool.newSolverProcess(packing_list)
-    quantity_solver_decision = filter(
-      lambda x:x.getCausalityValue().getTestedProperty()=='quantity',
-      solver_process.contentValues())[0]
-    # use Quantity Split Solver.
-    quantity_solver_decision.setSolverValue(self.portal.portal_types['Quantity Split Solver'])
-    # configure for Quantity Split Solver.
-    kw = {'delivery_solver':'FIFO',
-          'start_date':self.datetime + 15,
-          'stop_date':self.datetime + 25}
-    quantity_solver_decision.updateConfiguration(**kw)
-    solver_process.buildTargetSolverList()
-    solver_process.solve()
-    # build split deliveries manually. XXX ad-hoc
-    previous_tag = None
-    for delivery_builder in packing_list.getBuilderList():
-      this_builder_tag = '%s_split_%s' % (packing_list.getPath(),
-                                          delivery_builder.getId())
-      after_tag = []
-      if previous_tag:
-        after_tag.append(previous_tag)
-      delivery_builder.activate(
-        after_method_id=('solve',
-                         'immediateReindexObject',
-                         'recursiveImmediateReindexObject',), # XXX too brutal.
-        after_tag=after_tag,
-        ).build(explanation_uid=packing_list.getCausalityValue().getUid())
-
-  def _adoptPrevisionQuantity(self, packing_list):
-    """
-    Solve quantity divergence by using solver tool.
-    """
-    solver_tool = self.portal.portal_solvers
-    solver_process = solver_tool.newSolverProcess(packing_list)
-    quantity_solver_decision = filter(
-      lambda x:x.getCausalityValue().getTestedProperty()=='quantity',
-      solver_process.contentValues())[0]
-    # use Quantity Adoption Solver.
-    quantity_solver_decision.setSolverValue(self.portal.portal_types['Quantity Adoption Solver'])
-    solver_process.buildTargetSolverList()
-    solver_process.solve()
-
-  def stepAdoptPrevisionQuantity(self,sequence=None, sequence_list=None, **kw):
-    """
-    Solve quantity divergence by using solver tool.
-    """
-    packing_list = sequence.get('packing_list')
-    self._adoptPrevisionQuantity(packing_list)
-
-  def stepNewPackingListAdoptPrevisionQuantity(self, sequence=None,
-                                               sequence_list=None, **kw):
-    """
-    Solve quantity divergence by using solver tool.
-    """
-    packing_list = sequence.get('new_packing_list')
-    self._adoptPrevisionQuantity(packing_list)
-
-  def stepAdoptPrevisionResource(self,sequence=None, sequence_list=None, **kw):
-    """
-    Solve resource divergence by using solver tool.
-    """
-    packing_list = sequence.get('packing_list')
-    solver_tool = self.portal.portal_solvers
-    solver_process = solver_tool.newSolverProcess(packing_list)
-    resource_solver_decision = filter(
-      lambda x:x.getCausalityValue().getTestedProperty()=='resource',
-      solver_process.contentValues())[0]
-    # use Resource Adopt Solver.
-    resource_solver_decision.setSolverValue(self.portal.portal_types['Resource Adoption Solver'])
-    solver_process.buildTargetSolverList()
-    solver_process.solve()
-
-  def stepCheckPackingListLineWithSameResource(self,sequence=None, sequence_list=None, **kw):
-    """
-      Look if the packing list has new previsions
-    """
-    old_packing_list_line = sequence.get('packing_list_line')
-    packing_list_line = old_packing_list_line.aq_parent[str(int(old_packing_list_line.getId())-1)]
-    resource = sequence.get('resource')
-    for line in sequence.get('packing_list').getMovementList():
-      self.assertEquals(line.getResourceValue(), resource)
-      self.assertEquals(line.getQuantity(), self.default_quantity)
-      self.assertEquals(line.getCausalityList(),
-                        [x.getOrder() for x in \
-                         line.getDeliveryRelatedValueList()])
-
-  def stepUnifyDestinationWithDecision(self,sequence=None, sequence_list=None, **kw):
-    """
-      Check if simulation movement are disconnected
-    """
-    packing_list = sequence.get('packing_list')
-    solver_tool = self.portal.portal_solvers
-    solver_process = solver_tool.newSolverProcess(packing_list)
-    for destination_solver_decision in filter(
-      lambda x:x.getCausalityValue().getTestedProperty()=='destination',
-      solver_process.contentValues()):
-      # use Destination Replacement Solver.
-      destination_solver_decision.setSolverValue(self.portal.portal_types['Destination Replacement Solver'])
-    solver_process.buildTargetSolverList()
-    solver_process.solve()
-
-  def stepUnifyStartDateWithDecision(self,sequence=None, sequence_list=None, **kw):
-    """
-      Check if simulation movement are disconnected
-    """
-    packing_list = sequence.get('packing_list')
-    solver_tool = self.portal.portal_solvers
-    solver_process = solver_tool.newSolverProcess(packing_list)
-    for start_date_solver_decision in filter(
-      lambda x:x.getCausalityValue().getTestedProperty()=='start_date',
-      solver_process.contentValues()):
-      # use StartDate Replacement Solver.
-      start_date_solver_decision.setSolverValue(self.portal.portal_types['Start Date Replacement Solver'])
-      # configure for Quantity Split Solver.
-      kw = {'value':packing_list.getStartDate()}
-      start_date_solver_decision.updateConfiguration(**kw)
-    solver_process.buildTargetSolverList()
-    solver_process.solve()
-
-  def stepUnifyStartDateWithPrevision(self,sequence=None, sequence_list=None, **kw):
-    """
-      Check if simulation movement are disconnected
-    """
-    packing_list = sequence.get('packing_list')
-    applied_rule = sequence.get('applied_rule')
-    simulation_line_list = applied_rule.objectValues()
-    start_date = simulation_line_list[-1].getStartDate()
-    solver_tool = self.portal.portal_solvers
-    solver_process = solver_tool.newSolverProcess(packing_list)
-    for start_date_solver_decision in filter(
-      lambda x:x.getCausalityValue().getTestedProperty()=='start_date',
-      solver_process.contentValues()):
-      # use StartDate Replacement Solver.
-      start_date_solver_decision.setSolverValue(self.portal.portal_types['Start Date Replacement Solver'])
-      # configure for Quantity Split Solver.
-      kw = {'value':start_date}
-      start_date_solver_decision.updateConfiguration(**kw)
-    solver_process.buildTargetSolverList()
-    solver_process.solve()
-
-  def checkOrderRuleSimulation(self, rule_reference, sequence=None, sequence_list=None, **kw):
-    """
-      Test if simulation is matching order, be sure that rule_reference is used
-      to expand simulation for order
-    """
-    order = sequence.get('order')
-    related_applied_rule_list = order.getCausalityRelatedValueList( \
-                                   portal_type=self.applied_rule_portal_type)
-    no_applied_rule_state = ('draft', 'auto_planned')
-    order_state = order.getSimulationState()
-
-    if order_state in no_applied_rule_state:
-      self.assertEquals(0, len(related_applied_rule_list))
-    else:
-      LOG('stepCheckOrderRuleSimulation', 0, 'related_applied_rule_list: %s' %
-                   str([x.getObject() for x in related_applied_rule_list]))
-      self.assertEquals(1, len(related_applied_rule_list))
-      applied_rule = related_applied_rule_list[0].getObject()
-      sequence.edit(applied_rule=applied_rule)
-      self.failUnless(applied_rule is not None)
-      self.failUnless(order_state, \
-                      applied_rule.getLastExpandSimulationState())
-
-      # Test if applied rule has a specialise value with passed rule_reference
-      portal_rules = getToolByName(order, 'portal_rules')
-      self.assertEquals(rule_reference,
-                        applied_rule.getSpecialiseReference())
-
-      simulation_movement_list = applied_rule.objectValues()
-      sequence.edit(simulation_movement_list=simulation_movement_list)
-
-      # Count the number of movement in order
-      movement_list = order.getMovementList()
-      # Check if number of unique movement is equal to number of
-      # simulation movement
-      unique_movement_list = dict(
-        [('%r,%r,%r' % (x.getResource(), x.getVariationCategoryList(),
-          x.getVariationPropertyDict())), x] for x in movement_list).values()
-      self.assertEquals(len(unique_movement_list),
-                        len(simulation_movement_list))
-      # Check if all movements are related to simulation movements
-      order_movement_list = sum([x.getOrderValueList() for x in \
-                                 simulation_movement_list], [])
-      self.failIfDifferentSet(movement_list, order_movement_list)
-
-      # Check each simulation movement
-      for simulation_movement in simulation_movement_list:
-        order_movement_list = simulation_movement.getOrderValueList()
-        # Test quantity
-        self.assertEquals(sum([x.getQuantity() for x in order_movement_list]),
-                          simulation_movement.getQuantity())
-        for order_movement in order_movement_list:
-          # Test price
-          self.assertEquals(order_movement.getPrice(), \
-                            simulation_movement.getPrice())
-          # Test resource
-          self.assertEquals(order_movement.getResource(), \
-                            simulation_movement.getResource())
-          # Test resource variation
-          self.assertEquals(order_movement.getVariationText(), \
-                            simulation_movement.getVariationText())
-          self.assertEquals(order_movement.getVariationCategoryList(), \
-                            simulation_movement.getVariationCategoryList())
-          # XXX Test acquisition
-          self.checkAcquisition(simulation_movement, order_movement)
-
-  def stepModifySimulationLineQuantityForMergedLine(self,sequence=None, sequence_list=None, **kw):
-    """
-      Check if simulation movement are disconnected
-    """
-    applied_rule = sequence.get('applied_rule')
-    simulation_line_list = applied_rule.objectValues()
-    self.assertEquals(len(simulation_line_list), 1)
-    for simulation_line in simulation_line_list:
-      simulation_line.edit(quantity=self.default_quantity-2)
-      simulation_line.getOrderValue().edit(quantity=self.default_quantity-2)
-
-  def stepCheckSimulationQuantityUpdatedForMergedLine(self,sequence=None, sequence_list=None, **kw):
-    """
-      Test if the quantity of the simulation movement was changed
-    """
-    applied_rule = sequence.get('applied_rule')
-    simulation_line_list = applied_rule.objectValues()
-    self.assertEquals(len(simulation_line_list), 1)
-    for simulation_line in simulation_line_list:
-      self.assertEquals(simulation_line.getQuantity() + \
-                        simulation_line.getDeliveryError(),
-                        self.default_quantity * 2)
-
-  def stepCheckPackingListLineWithDifferentResource(self,sequence=None, sequence_list=None, **kw):
-    """
-      Look if the packing list has new previsions
-    """
-    packing_list_line = sequence.get('packing_list_line')
-    new_resource = sequence.get('resource')
-    self.assertEquals(packing_list_line.getQuantity(), self.default_quantity*2)
-    self.assertEquals(packing_list_line.getResourceValue(), new_resource)
-    simulation_line_list = packing_list_line.getDeliveryRelatedValueList()
-    order_line_list = sum([x.getOrderList() for x in simulation_line_list], [])
-    self.assertEquals(sorted(packing_list_line.getCausalityList()),
-                      sorted(order_line_list))
+  pass
+
+class TestERP5SimulationInvoice(TestERP5SimulationMixin, TestSaleInvoice):
+  pass
 
 def test_suite():
   suite = unittest.TestSuite()
   suite.addTest(unittest.makeSuite(TestERP5Simulation))
   suite.addTest(unittest.makeSuite(TestERP5SimulationPackingList))
+  suite.addTest(unittest.makeSuite(TestERP5SimulationInvoice))
   return suite




More information about the Erp5-report mailing list