[Erp5-report] r31895 kazuhiko - /erp5/trunk/products/ERP5/tests/testERP5Simulation.py
nobody at svn.erp5.org
nobody at svn.erp5.org
Fri Jan 22 10:07:01 CET 2010
Author: kazuhiko
Date: Fri Jan 22 10:06:59 2010
New Revision: 31895
URL: http://svn.erp5.org?rev=31895&view=rev
Log:
include invoice tests.
Modified:
erp5/trunk/products/ERP5/tests/testERP5Simulation.py
Modified: erp5/trunk/products/ERP5/tests/testERP5Simulation.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5/tests/testERP5Simulation.py?rev=31895&r1=31894&r2=31895&view=diff
==============================================================================
--- erp5/trunk/products/ERP5/tests/testERP5Simulation.py [utf8] (original)
+++ erp5/trunk/products/ERP5/tests/testERP5Simulation.py [utf8] Fri Jan 22 10:06:59 2010
@@ -36,20 +36,411 @@
from Products.CMFCore.utils import getToolByName
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.Sequence import SequenceList
-from testPackingList import TestPackingList, TestPackingListMixin
-
-class TestERP5SimulationMixin(TestPackingListMixin):
+from Products.ERP5Type.UnrestrictedMethod import UnrestrictedMethod
+from testPackingList import TestPackingList
+from testInvoice import TestSaleInvoice, TestInvoiceMixin
+
+class TestERP5SimulationMixin(TestInvoiceMixin):
def getBusinessTemplateList(self):
- return list(TestPackingListMixin.getBusinessTemplateList(self)) + \
- ['erp5_simulation',]
+ return list(TestInvoiceMixin.getBusinessTemplateList(self)) + \
+ ['erp5_administration', 'erp5_simulation',]
def afterSetUp(self, quiet=1, run=1):
- TestPackingListMixin.afterSetUp(self, quiet, run)
+ TestInvoiceMixin.afterSetUp(self)
portal_rules = self.portal.portal_rules
for rule in portal_rules.objectValues(portal_type='Order Rule'):
if rule.getValidationState() == 'validated':
rule.invalidate()
self.validateNewRules()
+
+ @UnrestrictedMethod
+ def createInvoiceTransactionRule(self, resource=None):
+ """Create a sale invoice transaction rule with only one cell for
+ product_line/apparel and default_region
+ The accounting rule cell will have the provided resource, but this his more
+ or less optional (as long as price currency is set correctly on order)
+ """
+ portal = self.portal
+ account_module = portal.account_module
+ for account_id, account_gap, account_type \
+ in self.account_definition_list:
+ if not account_id in account_module.objectIds():
+ account = account_module.newContent(id=account_id)
+ account.setGap(account_gap)
+ account.setAccountType(account_type)
+ portal.portal_workflow.doActionFor(account, 'validate_action')
+
+ invoice_rule = portal.portal_rules.default_invoice_transaction_rule
+ if invoice_rule.getValidationState() == 'validated':
+ invoice_rule.invalidate()
+ invoice_rule.deleteContent(list(invoice_rule.objectIds()))
+ transaction.commit()
+ self.tic()
+ region_predicate = invoice_rule.newContent(portal_type = 'Predicate')
+ product_line_predicate = invoice_rule.newContent(portal_type = 'Predicate')
+ region_predicate.edit(
+ membership_criterion_base_category_list = ['destination_region'],
+ membership_criterion_category_list =
+ ['destination_region/region/%s' % self.default_region ],
+ int_index = 1,
+ string_index = 'region'
+ )
+ product_line_predicate.edit(
+ membership_criterion_base_category_list = ['product_line'],
+ membership_criterion_category_list =
+ ['product_line/apparel'],
+ int_index = 1,
+ string_index = 'product'
+ )
+ product_line_predicate.immediateReindexObject()
+ region_predicate.immediateReindexObject()
+
+ invoice_rule.updateMatrix()
+ cell_list = invoice_rule.getCellValueList(base_id='movement')
+ self.assertEquals(len(cell_list),1)
+ cell = cell_list[0]
+
+ for line_id, line_source_id, line_destination_id, line_ratio in \
+ self.transaction_line_definition_list:
+ line = cell.newContent(id=line_id,
+ portal_type='Accounting Transaction Line', quantity=line_ratio,
+ resource_value=resource,
+ source_value=account_module[line_source_id],
+ destination_value=account_module[line_destination_id])
+
+ # matching provider for source and destination
+ for category in ('source', 'destination',):
+ invoice_rule.newContent(
+ portal_type='Category Membership Divergence Tester',
+ title='%s divergence tester' % category,
+ tested_property=category,
+ divergence_provider=False,
+ matching_provider=True)
+ # matching provider for quantity (i.e. only used for expand)
+ invoice_rule.newContent(
+ portal_type='Net Converted Quantity Divergence Tester',
+ title='%s divergence tester' % category,
+ tested_property='quantity',
+ quantity=0,
+ divergence_provider=False,
+ matching_provider=True)
+ invoice_rule.validate()
+ transaction.commit()
+ self.tic()
+
+ def validateNewRules(self):
+ # create an Order Rule document.
+ portal_rules = self.portal.portal_rules
+ new_order_rule = filter(
+ lambda x:x.title == 'New Default Order Rule',
+ portal_rules.objectValues(portal_type='Order Rule'))[0]
+ if new_order_rule.getValidationState() != 'validated':
+ new_order_rule.validate()
+
+ def _acceptDecisionQuantity(self, document):
+ solver_tool = self.portal.portal_solvers
+ solver_process = solver_tool.newSolverProcess(document)
+ quantity_solver_decision = filter(
+ lambda x:x.getCausalityValue().getTestedProperty()=='quantity',
+ solver_process.contentValues())[0]
+ # use Quantity Accept Solver.
+ quantity_solver_decision.setSolverValue(self.portal.portal_types['Quantity Accept Solver'])
+ solver_process.buildTargetSolverList()
+ solver_process.solve()
+
+ def _acceptDivergenceOnInvoice(self, invoice, divergence_list):
+ print invoice, divergence_list
+ return self._acceptDecisionQuantity(invoice)
+
+ def stepAcceptDecisionQuantity(self,sequence=None, sequence_list=None):
+ """
+ Solve quantity divergence by using solver tool.
+ """
+ packing_list = sequence.get('packing_list')
+ self._acceptDecisionQuantity(packing_list)
+
+ def stepAcceptDecisionQuantityInvoice(self, sequence=None,
+ sequence_list=None):
+ """
+ Solve quantity divergence by using solver tool.
+ """
+ invoice = sequence.get('invoice')
+ self._acceptDecisionQuantity(invoice)
+
+ def stepAcceptDecisionResource(self,sequence=None, sequence_list=None):
+ """
+ Solve quantity divergence by using solver tool.
+ """
+ packing_list = sequence.get('packing_list')
+ solver_tool = self.portal.portal_solvers
+ solver_process = solver_tool.newSolverProcess(packing_list)
+ resource_solver_decision = filter(
+ lambda x:x.getCausalityValue().getTestedProperty()=='resource',
+ solver_process.contentValues())[0]
+ # use Resource Replacement Solver.
+ resource_solver_decision.setSolverValue(self.portal.portal_types['Resource Replacement Solver'])
+ solver_process.buildTargetSolverList()
+ solver_process.solve()
+
+ def stepSplitAndDeferPackingList(self, sequence=None, sequence_list=None):
+ """
+ Do the split and defer action
+ """
+ packing_list = sequence.get('packing_list')
+ solver_tool = self.portal.portal_solvers
+ solver_process = solver_tool.newSolverProcess(packing_list)
+ quantity_solver_decision = filter(
+ lambda x:x.getCausalityValue().getTestedProperty()=='quantity',
+ solver_process.contentValues())[0]
+ # use Quantity Split Solver.
+ quantity_solver_decision.setSolverValue(self.portal.portal_types['Quantity Split Solver'])
+ # configure for Quantity Split Solver.
+ kw = {'delivery_solver':'FIFO',
+ 'start_date':self.datetime + 15,
+ 'stop_date':self.datetime + 25}
+ quantity_solver_decision.updateConfiguration(**kw)
+ solver_process.buildTargetSolverList()
+ solver_process.solve()
+ # build split deliveries manually. XXX ad-hoc
+ previous_tag = None
+ for delivery_builder in packing_list.getBuilderList():
+ this_builder_tag = '%s_split_%s' % (packing_list.getPath(),
+ delivery_builder.getId())
+ after_tag = []
+ if previous_tag:
+ after_tag.append(previous_tag)
+ delivery_builder.activate(
+ after_method_id=('solve',
+ 'immediateReindexObject',
+ 'recursiveImmediateReindexObject',), # XXX too brutal.
+ after_tag=after_tag,
+ ).build(explanation_uid=packing_list.getCausalityValue().getUid())
+
+ def _adoptPrevisionQuantity(self, packing_list):
+ """
+ Solve quantity divergence by using solver tool.
+ """
+ solver_tool = self.portal.portal_solvers
+ solver_process = solver_tool.newSolverProcess(packing_list)
+ quantity_solver_decision = filter(
+ lambda x:x.getCausalityValue().getTestedProperty()=='quantity',
+ solver_process.contentValues())[0]
+ # use Quantity Adoption Solver.
+ quantity_solver_decision.setSolverValue(self.portal.portal_types['Quantity Adoption Solver'])
+ solver_process.buildTargetSolverList()
+ solver_process.solve()
+
+ def _adoptDivergenceOnInvoice(self, invoice, divergence_list):
+ print invoice, divergence_list
+ return self._adoptPrevisionQuantity(invoice)
+
+ def _adoptDivergenceOnPackingList(self, packing_list, divergence_list):
+ print packing_list, divergence_list
+ return self._adoptPrevisionQuantity(packing_list)
+
+ def stepAdoptPrevisionQuantity(self,sequence=None, sequence_list=None):
+ """
+ Solve quantity divergence by using solver tool.
+ """
+ packing_list = sequence.get('packing_list')
+ self._adoptPrevisionQuantity(packing_list)
+
+ def stepNewPackingListAdoptPrevisionQuantity(self, sequence=None,
+ sequence_list=None):
+ """
+ Solve quantity divergence by using solver tool.
+ """
+ packing_list = sequence.get('new_packing_list')
+ self._adoptPrevisionQuantity(packing_list)
+
+ def stepAdoptPrevisionResource(self,sequence=None, sequence_list=None):
+ """
+ Solve resource divergence by using solver tool.
+ """
+ packing_list = sequence.get('packing_list')
+ solver_tool = self.portal.portal_solvers
+ solver_process = solver_tool.newSolverProcess(packing_list)
+ resource_solver_decision = filter(
+ lambda x:x.getCausalityValue().getTestedProperty()=='resource',
+ solver_process.contentValues())[0]
+ # use Resource Adopt Solver.
+ resource_solver_decision.setSolverValue(self.portal.portal_types['Resource Adoption Solver'])
+ solver_process.buildTargetSolverList()
+ solver_process.solve()
+
+ def stepCheckPackingListLineWithSameResource(self,sequence=None, sequence_list=None):
+ """
+ Look if the packing list has new previsions
+ """
+ old_packing_list_line = sequence.get('packing_list_line')
+ packing_list_line = old_packing_list_line.aq_parent[str(int(old_packing_list_line.getId())-1)]
+ resource = sequence.get('resource')
+ for line in sequence.get('packing_list').getMovementList():
+ self.assertEquals(line.getResourceValue(), resource)
+ self.assertEquals(line.getQuantity(), self.default_quantity)
+ self.assertEquals(line.getCausalityList(),
+ [x.getOrder() for x in \
+ line.getDeliveryRelatedValueList()])
+
+ def stepUnifyDestinationWithDecision(self,sequence=None, sequence_list=None):
+ """
+ Check if simulation movement are disconnected
+ """
+ packing_list = sequence.get('packing_list')
+ solver_tool = self.portal.portal_solvers
+ solver_process = solver_tool.newSolverProcess(packing_list)
+ for destination_solver_decision in filter(
+ lambda x:x.getCausalityValue().getTestedProperty()=='destination',
+ solver_process.contentValues()):
+ # use Destination Replacement Solver.
+ destination_solver_decision.setSolverValue(self.portal.portal_types['Destination Replacement Solver'])
+ solver_process.buildTargetSolverList()
+ solver_process.solve()
+
+ def _unifyStartDateWithDecision(self, document):
+ solver_tool = self.portal.portal_solvers
+ solver_process = solver_tool.newSolverProcess(document)
+ for start_date_solver_decision in filter(
+ lambda x:x.getCausalityValue().getTestedProperty()=='start_date',
+ solver_process.contentValues()):
+ # use StartDate Replacement Solver.
+ start_date_solver_decision.setSolverValue(self.portal.portal_types['Start Date Replacement Solver'])
+ # configure for Quantity Split Solver.
+ kw = {'value':document.getStartDate()}
+ start_date_solver_decision.updateConfiguration(**kw)
+ solver_process.buildTargetSolverList()
+ solver_process.solve()
+
+ def stepUnifyStartDateWithDecision(self,sequence=None, sequence_list=None):
+ packing_list = sequence.get('packing_list')
+ self._unifyStartDateWithDecision(packing_list)
+
+ def stepUnifyStartDateWithDecisionInvoice(self,sequence=None, sequence_list=None):
+ invoice = sequence.get('invoice')
+ self._unifyStartDateWithDecision(invoice)
+
+ def stepUnifyStartDateWithPrevision(self,sequence=None, sequence_list=None):
+ """
+ Check if simulation movement are disconnected
+ """
+ packing_list = sequence.get('packing_list')
+ applied_rule = sequence.get('applied_rule')
+ simulation_line_list = applied_rule.objectValues()
+ start_date = simulation_line_list[-1].getStartDate()
+ solver_tool = self.portal.portal_solvers
+ solver_process = solver_tool.newSolverProcess(packing_list)
+ for start_date_solver_decision in filter(
+ lambda x:x.getCausalityValue().getTestedProperty()=='start_date',
+ solver_process.contentValues()):
+ # use StartDate Replacement Solver.
+ start_date_solver_decision.setSolverValue(self.portal.portal_types['Start Date Replacement Solver'])
+ # configure for Quantity Split Solver.
+ kw = {'value':start_date}
+ start_date_solver_decision.updateConfiguration(**kw)
+ solver_process.buildTargetSolverList()
+ solver_process.solve()
+
+ def checkOrderRuleSimulation(self, rule_reference, sequence=None, sequence_list=None):
+ """
+ Test if simulation is matching order, be sure that rule_reference is used
+ to expand simulation for order
+ """
+ order = sequence.get('order')
+ related_applied_rule_list = order.getCausalityRelatedValueList( \
+ portal_type=self.applied_rule_portal_type)
+ no_applied_rule_state = ('draft', 'auto_planned')
+ order_state = order.getSimulationState()
+
+ if order_state in no_applied_rule_state:
+ self.assertEquals(0, len(related_applied_rule_list))
+ else:
+ LOG('stepCheckOrderRuleSimulation', 0, 'related_applied_rule_list: %s' %
+ str([x.getObject() for x in related_applied_rule_list]))
+ self.assertEquals(1, len(related_applied_rule_list))
+ applied_rule = related_applied_rule_list[0].getObject()
+ sequence.edit(applied_rule=applied_rule)
+ self.failUnless(applied_rule is not None)
+ self.failUnless(order_state, \
+ applied_rule.getLastExpandSimulationState())
+
+ # Test if applied rule has a specialise value with passed rule_reference
+ portal_rules = getToolByName(order, 'portal_rules')
+ self.assertEquals(rule_reference,
+ applied_rule.getSpecialiseReference())
+
+ simulation_movement_list = applied_rule.objectValues()
+ sequence.edit(simulation_movement_list=simulation_movement_list)
+
+ # Count the number of movement in order
+ movement_list = order.getMovementList()
+ # Check if number of unique movement is equal to number of
+ # simulation movement
+ unique_movement_list = dict(
+ [('%r,%r,%r' % (x.getResource(), x.getVariationCategoryList(),
+ x.getVariationPropertyDict())), x] for x in movement_list).values()
+ self.assertEquals(len(unique_movement_list),
+ len(simulation_movement_list))
+ # Check if all movements are related to simulation movements
+ order_movement_list = sum([x.getOrderValueList() for x in \
+ simulation_movement_list], [])
+ self.failIfDifferentSet(movement_list, order_movement_list)
+
+ # Check each simulation movement
+ for simulation_movement in simulation_movement_list:
+ order_movement_list = simulation_movement.getOrderValueList()
+ # Test quantity
+ self.assertEquals(sum([x.getQuantity() for x in order_movement_list]),
+ simulation_movement.getQuantity())
+ for order_movement in order_movement_list:
+ # Test price
+ self.assertEquals(order_movement.getPrice(), \
+ simulation_movement.getPrice())
+ # Test resource
+ self.assertEquals(order_movement.getResource(), \
+ simulation_movement.getResource())
+ # Test resource variation
+ self.assertEquals(order_movement.getVariationText(), \
+ simulation_movement.getVariationText())
+ self.assertEquals(order_movement.getVariationCategoryList(), \
+ simulation_movement.getVariationCategoryList())
+ # XXX Test acquisition
+ self.checkAcquisition(simulation_movement, order_movement)
+
+ def stepModifySimulationLineQuantityForMergedLine(self,sequence=None, sequence_list=None):
+ """
+ Check if simulation movement are disconnected
+ """
+ applied_rule = sequence.get('applied_rule')
+ simulation_line_list = applied_rule.objectValues()
+ self.assertEquals(len(simulation_line_list), 1)
+ for simulation_line in simulation_line_list:
+ simulation_line.edit(quantity=self.default_quantity-2)
+ simulation_line.getOrderValue().edit(quantity=self.default_quantity-2)
+
+ def stepCheckSimulationQuantityUpdatedForMergedLine(self,sequence=None, sequence_list=None):
+ """
+ Test if the quantity of the simulation movement was changed
+ """
+ applied_rule = sequence.get('applied_rule')
+ simulation_line_list = applied_rule.objectValues()
+ self.assertEquals(len(simulation_line_list), 1)
+ for simulation_line in simulation_line_list:
+ self.assertEquals(simulation_line.getQuantity() + \
+ simulation_line.getDeliveryError(),
+ self.default_quantity * 2)
+
+ def stepCheckPackingListLineWithDifferentResource(self,sequence=None, sequence_list=None):
+ """
+ Look if the packing list has new previsions
+ """
+ packing_list_line = sequence.get('packing_list_line')
+ new_resource = sequence.get('resource')
+ self.assertEquals(packing_list_line.getQuantity(), self.default_quantity*2)
+ self.assertEquals(packing_list_line.getResourceValue(), new_resource)
+ simulation_line_list = packing_list_line.getDeliveryRelatedValueList()
+ order_line_list = sum([x.getOrderList() for x in simulation_line_list], [])
+ self.assertEquals(sorted(packing_list_line.getCausalityList()),
+ sorted(order_line_list))
class TestERP5Simulation(TestERP5SimulationMixin, ERP5TypeTestCase):
run_all_test = 1
@@ -205,298 +596,14 @@
sequence_list.play(self, quiet=quiet)
class TestERP5SimulationPackingList(TestERP5SimulationMixin, TestPackingList):
- def validateNewRules(self):
- # create an Order Rule document.
- portal_rules = self.portal.portal_rules
- new_order_rule = filter(
- lambda x:x.title == 'New Default Order Rule',
- portal_rules.objectValues(portal_type='Order Rule'))[0]
- if new_order_rule.getValidationState() != 'validated':
- new_order_rule.validate()
-
- def stepAcceptDecisionQuantity(self,sequence=None, sequence_list=None, **kw):
- """
- Solve quantity divergence by using solver tool.
- """
- packing_list = sequence.get('packing_list')
- solver_tool = self.portal.portal_solvers
- solver_process = solver_tool.newSolverProcess(packing_list)
- quantity_solver_decision = filter(
- lambda x:x.getCausalityValue().getTestedProperty()=='quantity',
- solver_process.contentValues())[0]
- # use Quantity Accept Solver.
- quantity_solver_decision.setSolverValue(self.portal.portal_types['Quantity Accept Solver'])
- solver_process.buildTargetSolverList()
- solver_process.solve()
- # XXX-JPS We do not need the divergence message anymore.
- # since the divergence message == the divergence tester itself
- # with its title, description, tested property, etc.
-
- def stepAcceptDecisionResource(self,sequence=None, sequence_list=None, **kw):
- """
- Solve quantity divergence by using solver tool.
- """
- packing_list = sequence.get('packing_list')
- solver_tool = self.portal.portal_solvers
- solver_process = solver_tool.newSolverProcess(packing_list)
- resource_solver_decision = filter(
- lambda x:x.getCausalityValue().getTestedProperty()=='resource',
- solver_process.contentValues())[0]
- # use Resource Replacement Solver.
- resource_solver_decision.setSolverValue(self.portal.portal_types['Resource Replacement Solver'])
- solver_process.buildTargetSolverList()
- solver_process.solve()
-
- def stepSplitAndDeferPackingList(self, sequence=None, sequence_list=None, **kw):
- """
- Do the split and defer action
- """
- packing_list = sequence.get('packing_list')
- solver_tool = self.portal.portal_solvers
- solver_process = solver_tool.newSolverProcess(packing_list)
- quantity_solver_decision = filter(
- lambda x:x.getCausalityValue().getTestedProperty()=='quantity',
- solver_process.contentValues())[0]
- # use Quantity Split Solver.
- quantity_solver_decision.setSolverValue(self.portal.portal_types['Quantity Split Solver'])
- # configure for Quantity Split Solver.
- kw = {'delivery_solver':'FIFO',
- 'start_date':self.datetime + 15,
- 'stop_date':self.datetime + 25}
- quantity_solver_decision.updateConfiguration(**kw)
- solver_process.buildTargetSolverList()
- solver_process.solve()
- # build split deliveries manually. XXX ad-hoc
- previous_tag = None
- for delivery_builder in packing_list.getBuilderList():
- this_builder_tag = '%s_split_%s' % (packing_list.getPath(),
- delivery_builder.getId())
- after_tag = []
- if previous_tag:
- after_tag.append(previous_tag)
- delivery_builder.activate(
- after_method_id=('solve',
- 'immediateReindexObject',
- 'recursiveImmediateReindexObject',), # XXX too brutal.
- after_tag=after_tag,
- ).build(explanation_uid=packing_list.getCausalityValue().getUid())
-
- def _adoptPrevisionQuantity(self, packing_list):
- """
- Solve quantity divergence by using solver tool.
- """
- solver_tool = self.portal.portal_solvers
- solver_process = solver_tool.newSolverProcess(packing_list)
- quantity_solver_decision = filter(
- lambda x:x.getCausalityValue().getTestedProperty()=='quantity',
- solver_process.contentValues())[0]
- # use Quantity Adoption Solver.
- quantity_solver_decision.setSolverValue(self.portal.portal_types['Quantity Adoption Solver'])
- solver_process.buildTargetSolverList()
- solver_process.solve()
-
- def stepAdoptPrevisionQuantity(self,sequence=None, sequence_list=None, **kw):
- """
- Solve quantity divergence by using solver tool.
- """
- packing_list = sequence.get('packing_list')
- self._adoptPrevisionQuantity(packing_list)
-
- def stepNewPackingListAdoptPrevisionQuantity(self, sequence=None,
- sequence_list=None, **kw):
- """
- Solve quantity divergence by using solver tool.
- """
- packing_list = sequence.get('new_packing_list')
- self._adoptPrevisionQuantity(packing_list)
-
- def stepAdoptPrevisionResource(self,sequence=None, sequence_list=None, **kw):
- """
- Solve resource divergence by using solver tool.
- """
- packing_list = sequence.get('packing_list')
- solver_tool = self.portal.portal_solvers
- solver_process = solver_tool.newSolverProcess(packing_list)
- resource_solver_decision = filter(
- lambda x:x.getCausalityValue().getTestedProperty()=='resource',
- solver_process.contentValues())[0]
- # use Resource Adopt Solver.
- resource_solver_decision.setSolverValue(self.portal.portal_types['Resource Adoption Solver'])
- solver_process.buildTargetSolverList()
- solver_process.solve()
-
- def stepCheckPackingListLineWithSameResource(self,sequence=None, sequence_list=None, **kw):
- """
- Look if the packing list has new previsions
- """
- old_packing_list_line = sequence.get('packing_list_line')
- packing_list_line = old_packing_list_line.aq_parent[str(int(old_packing_list_line.getId())-1)]
- resource = sequence.get('resource')
- for line in sequence.get('packing_list').getMovementList():
- self.assertEquals(line.getResourceValue(), resource)
- self.assertEquals(line.getQuantity(), self.default_quantity)
- self.assertEquals(line.getCausalityList(),
- [x.getOrder() for x in \
- line.getDeliveryRelatedValueList()])
-
- def stepUnifyDestinationWithDecision(self,sequence=None, sequence_list=None, **kw):
- """
- Check if simulation movement are disconnected
- """
- packing_list = sequence.get('packing_list')
- solver_tool = self.portal.portal_solvers
- solver_process = solver_tool.newSolverProcess(packing_list)
- for destination_solver_decision in filter(
- lambda x:x.getCausalityValue().getTestedProperty()=='destination',
- solver_process.contentValues()):
- # use Destination Replacement Solver.
- destination_solver_decision.setSolverValue(self.portal.portal_types['Destination Replacement Solver'])
- solver_process.buildTargetSolverList()
- solver_process.solve()
-
- def stepUnifyStartDateWithDecision(self,sequence=None, sequence_list=None, **kw):
- """
- Check if simulation movement are disconnected
- """
- packing_list = sequence.get('packing_list')
- solver_tool = self.portal.portal_solvers
- solver_process = solver_tool.newSolverProcess(packing_list)
- for start_date_solver_decision in filter(
- lambda x:x.getCausalityValue().getTestedProperty()=='start_date',
- solver_process.contentValues()):
- # use StartDate Replacement Solver.
- start_date_solver_decision.setSolverValue(self.portal.portal_types['Start Date Replacement Solver'])
- # configure for Quantity Split Solver.
- kw = {'value':packing_list.getStartDate()}
- start_date_solver_decision.updateConfiguration(**kw)
- solver_process.buildTargetSolverList()
- solver_process.solve()
-
- def stepUnifyStartDateWithPrevision(self,sequence=None, sequence_list=None, **kw):
- """
- Check if simulation movement are disconnected
- """
- packing_list = sequence.get('packing_list')
- applied_rule = sequence.get('applied_rule')
- simulation_line_list = applied_rule.objectValues()
- start_date = simulation_line_list[-1].getStartDate()
- solver_tool = self.portal.portal_solvers
- solver_process = solver_tool.newSolverProcess(packing_list)
- for start_date_solver_decision in filter(
- lambda x:x.getCausalityValue().getTestedProperty()=='start_date',
- solver_process.contentValues()):
- # use StartDate Replacement Solver.
- start_date_solver_decision.setSolverValue(self.portal.portal_types['Start Date Replacement Solver'])
- # configure for Quantity Split Solver.
- kw = {'value':start_date}
- start_date_solver_decision.updateConfiguration(**kw)
- solver_process.buildTargetSolverList()
- solver_process.solve()
-
- def checkOrderRuleSimulation(self, rule_reference, sequence=None, sequence_list=None, **kw):
- """
- Test if simulation is matching order, be sure that rule_reference is used
- to expand simulation for order
- """
- order = sequence.get('order')
- related_applied_rule_list = order.getCausalityRelatedValueList( \
- portal_type=self.applied_rule_portal_type)
- no_applied_rule_state = ('draft', 'auto_planned')
- order_state = order.getSimulationState()
-
- if order_state in no_applied_rule_state:
- self.assertEquals(0, len(related_applied_rule_list))
- else:
- LOG('stepCheckOrderRuleSimulation', 0, 'related_applied_rule_list: %s' %
- str([x.getObject() for x in related_applied_rule_list]))
- self.assertEquals(1, len(related_applied_rule_list))
- applied_rule = related_applied_rule_list[0].getObject()
- sequence.edit(applied_rule=applied_rule)
- self.failUnless(applied_rule is not None)
- self.failUnless(order_state, \
- applied_rule.getLastExpandSimulationState())
-
- # Test if applied rule has a specialise value with passed rule_reference
- portal_rules = getToolByName(order, 'portal_rules')
- self.assertEquals(rule_reference,
- applied_rule.getSpecialiseReference())
-
- simulation_movement_list = applied_rule.objectValues()
- sequence.edit(simulation_movement_list=simulation_movement_list)
-
- # Count the number of movement in order
- movement_list = order.getMovementList()
- # Check if number of unique movement is equal to number of
- # simulation movement
- unique_movement_list = dict(
- [('%r,%r,%r' % (x.getResource(), x.getVariationCategoryList(),
- x.getVariationPropertyDict())), x] for x in movement_list).values()
- self.assertEquals(len(unique_movement_list),
- len(simulation_movement_list))
- # Check if all movements are related to simulation movements
- order_movement_list = sum([x.getOrderValueList() for x in \
- simulation_movement_list], [])
- self.failIfDifferentSet(movement_list, order_movement_list)
-
- # Check each simulation movement
- for simulation_movement in simulation_movement_list:
- order_movement_list = simulation_movement.getOrderValueList()
- # Test quantity
- self.assertEquals(sum([x.getQuantity() for x in order_movement_list]),
- simulation_movement.getQuantity())
- for order_movement in order_movement_list:
- # Test price
- self.assertEquals(order_movement.getPrice(), \
- simulation_movement.getPrice())
- # Test resource
- self.assertEquals(order_movement.getResource(), \
- simulation_movement.getResource())
- # Test resource variation
- self.assertEquals(order_movement.getVariationText(), \
- simulation_movement.getVariationText())
- self.assertEquals(order_movement.getVariationCategoryList(), \
- simulation_movement.getVariationCategoryList())
- # XXX Test acquisition
- self.checkAcquisition(simulation_movement, order_movement)
-
- def stepModifySimulationLineQuantityForMergedLine(self,sequence=None, sequence_list=None, **kw):
- """
- Check if simulation movement are disconnected
- """
- applied_rule = sequence.get('applied_rule')
- simulation_line_list = applied_rule.objectValues()
- self.assertEquals(len(simulation_line_list), 1)
- for simulation_line in simulation_line_list:
- simulation_line.edit(quantity=self.default_quantity-2)
- simulation_line.getOrderValue().edit(quantity=self.default_quantity-2)
-
- def stepCheckSimulationQuantityUpdatedForMergedLine(self,sequence=None, sequence_list=None, **kw):
- """
- Test if the quantity of the simulation movement was changed
- """
- applied_rule = sequence.get('applied_rule')
- simulation_line_list = applied_rule.objectValues()
- self.assertEquals(len(simulation_line_list), 1)
- for simulation_line in simulation_line_list:
- self.assertEquals(simulation_line.getQuantity() + \
- simulation_line.getDeliveryError(),
- self.default_quantity * 2)
-
- def stepCheckPackingListLineWithDifferentResource(self,sequence=None, sequence_list=None, **kw):
- """
- Look if the packing list has new previsions
- """
- packing_list_line = sequence.get('packing_list_line')
- new_resource = sequence.get('resource')
- self.assertEquals(packing_list_line.getQuantity(), self.default_quantity*2)
- self.assertEquals(packing_list_line.getResourceValue(), new_resource)
- simulation_line_list = packing_list_line.getDeliveryRelatedValueList()
- order_line_list = sum([x.getOrderList() for x in simulation_line_list], [])
- self.assertEquals(sorted(packing_list_line.getCausalityList()),
- sorted(order_line_list))
+ pass
+
+class TestERP5SimulationInvoice(TestERP5SimulationMixin, TestSaleInvoice):
+ pass
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestERP5Simulation))
suite.addTest(unittest.makeSuite(TestERP5SimulationPackingList))
+ suite.addTest(unittest.makeSuite(TestERP5SimulationInvoice))
return suite
More information about the Erp5-report
mailing list