[Erp5-report] r27271 - in /erp5/trunk/products/ERP5: Document/ Interface/ tests/
nobody at svn.erp5.org
nobody at svn.erp5.org
Fri May 29 18:37:29 CEST 2009
Author: yusuke
Date: Fri May 29 18:37:29 2009
New Revision: 27271
URL: http://svn.erp5.org?rev=27271&view=rev
Log:
- modified TransformationRule to use Business Process
instead of Supply Chain.
- added/changed some features for above modified.
Added:
erp5/trunk/products/ERP5/tests/testMRP.py
Modified:
erp5/trunk/products/ERP5/Document/BusinessProcess.py
erp5/trunk/products/ERP5/Document/BusinessState.py
erp5/trunk/products/ERP5/Document/Transformation.py
erp5/trunk/products/ERP5/Document/TransformationRule.py
erp5/trunk/products/ERP5/Interface/IBusinessProcess.py
Modified: erp5/trunk/products/ERP5/Document/BusinessProcess.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5/Document/BusinessProcess.py?rev=27271&r1=27270&r2=27271&view=diff
==============================================================================
--- erp5/trunk/products/ERP5/Document/BusinessProcess.py [utf8] (original)
+++ erp5/trunk/products/ERP5/Document/BusinessProcess.py [utf8] Fri May 29 18:37:29 2009
@@ -185,3 +185,11 @@
def isStopDateReferential(self):
return self.getReferentialDate() == 'stop_date'
+
+ def getTradePhaseList(self):
+ """
+ Returns all trade_phase of this business process
+ """
+ path_list = self.objectValues(portal_type=self.getPortalBusinessPathTypeList())
+ return filter(None, [path.getTradePhase()
+ for path in path_list])
Modified: erp5/trunk/products/ERP5/Document/BusinessState.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5/Document/BusinessState.py?rev=27271&r1=27270&r2=27271&view=diff
==============================================================================
--- erp5/trunk/products/ERP5/Document/BusinessState.py [utf8] (original)
+++ erp5/trunk/products/ERP5/Document/BusinessState.py [utf8] Fri May 29 18:37:29 2009
@@ -161,8 +161,10 @@
"""
remaining_trade_phase_list = []
for path in self.getPredecessorRelatedValueList():
- if not (path.isCompleted(explanation) or
- path.isPartiallyCompleted(explanation)):
+ # XXX When no simulations related to path, what should path.isCompleted return?
+ # if True we don't have way to add remaining trade phases to new movement
+ if not (path._getRelatedSimulationMovementList(explanation) and
+ path.isCompleted(explanation)):
remaining_trade_phase_list += path.getTradePhaseValueList()
# collect to successor direction recursively
Modified: erp5/trunk/products/ERP5/Document/Transformation.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5/Document/Transformation.py?rev=27271&r1=27270&r2=27271&view=diff
==============================================================================
--- erp5/trunk/products/ERP5/Document/Transformation.py [utf8] (original)
+++ erp5/trunk/products/ERP5/Document/Transformation.py [utf8] Fri May 29 18:37:29 2009
@@ -41,6 +41,8 @@
from Products.CMFCategory.Renderer import Renderer
from Products.ERP5.AggregatedAmountList import AggregatedAmountList
+
+from zLOG import LOG, WARNING
class Transformation(XMLObject, Predicate, Variated):
"""
@@ -223,7 +225,9 @@
security.declareProtected(Permissions.AccessContentsInformation,
'getAggregatedAmountList')
def getAggregatedAmountList(self, context=None, REQUEST=None,
- ind_phase_url_list=None,
+ trade_phase_list=None,
+ # obsolete, use trade_phase_list instead
+ ind_phase_url_list=None,
rejected_resource_uid_list=None,
context_quantity=0,**kw):
"""
@@ -247,8 +251,14 @@
transformation_line_list = []
for transformation in ([self]+template_transformation_list):
transformation_line_list.extend(transformation.objectValues())
+ # Get only lines related to a precise trade_phase
+ if trade_phase_list is not None:
+ transformation_line_list = filter(
+ lambda line: line.getTradePhase() in trade_phase_list,
+ transformation_line_list)
# Get only lines related to a precise industrial_phase
if ind_phase_url_list is not None:
+ LOG("Transformation", WARNING, "ind_phase_list is obsolete")
new_transf_line_list = []
for line in transformation_line_list:
ind_ph = line.getIndustrialPhaseValue()
Modified: erp5/trunk/products/ERP5/Document/TransformationRule.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5/Document/TransformationRule.py?rev=27271&r1=27270&r2=27271&view=diff
==============================================================================
--- erp5/trunk/products/ERP5/Document/TransformationRule.py [utf8] (original)
+++ erp5/trunk/products/ERP5/Document/TransformationRule.py [utf8] Fri May 29 18:37:29 2009
@@ -1,6 +1,7 @@
+# -*- coding:utf-8 -*-
##############################################################################
#
-# Copyright (c) 2002, 2005 Nexedi SARL and Contributors. All Rights Reserved.
+# Copyright (c) 2002-2009 Nexedi SARL and Contributors. All Rights Reserved.
# Jean-Paul Smets-Solanes <jp at nexedi.com>
# Romain Courteaud <romain at nexedi.com>
#
@@ -27,298 +28,278 @@
#
##############################################################################
+from ExtensionClass import Base
from AccessControl import ClassSecurityInfo
from Acquisition import aq_base, aq_parent, aq_inner, aq_acquire
from Products.CMFCore.utils import getToolByName
from Products.ERP5Type import Permissions, PropertySheet, Constraint, Interface
from Products.ERP5.Document.Rule import Rule
+from Products.ERP5.Document.SimulationMovement import SimulationMovement
from Products.ERP5Type.Errors import TransformationRuleError
-from Products.ERP5.Document.TransformationSourcingRule import\
- TransformationSourcingRuleMixin
-
-from zLOG import LOG
-
-class TransformationRule(TransformationSourcingRuleMixin, Rule):
- """
- Order Rule object make sure an Order in the similation
- is consistent with the real order
- """
- # CMF Type Definition
- meta_type = 'ERP5 Transformation Rule'
- portal_type = 'Transformation Rule'
- # Declarative security
- security = ClassSecurityInfo()
- security.declareObjectProtected(Permissions.AccessContentsInformation)
- __implements__ = ( Interface.Predicate,
- Interface.Rule )
- # Default Properties
- property_sheets = ( PropertySheet.Base
+
+class TransformationRuleMixin(Base):
+ security = ClassSecurityInfo()
+
+ security.declareProtected(Permissions.View, 'getTransformation')
+ def getTransformation(self, movement=None, applied_rule=None):
+ """
+ Return transformation related to used by the applied rule.
+ """
+ if movement is None and applied_rule is not None:
+ movement = applied_rule.getParentValue()
+
+ order_movement = movement.getRootSimulationMovement().getOrderValue()
+ explanation = self.getExplanation(movement=movement,
+ applied_rule=applied_rule)
+ # find line recursively
+ order_line = order_movement
+ while order_line.getParentValue() != explanation:
+ order_line = order_line.getParentValue()
+
+ script = order_line._getTypeBasedMethod('_getTransformation')
+ if script is not None:
+ transformation = script()
+ else:
+ line_transformation = order_line.objectValues(
+ portal_type=self.getPortalTransformationTypeList())
+ if len(line_transformation) == 1:
+ transformation = line_transformation[0]
+ else:
+ transformation = order_line.getSpecialiseValue(
+ portal_type=self.getPortalTransformationTypeList())
+
+ if transformation.getResource() == movement.getResource():
+ return transformation
+
+ security.declareProtected(Permissions.View, 'getBusinessProcess')
+ def getBusinessProcess(self, **kwargs):
+ """
+ Return business process related to root causality.
+ """
+ explanation = self.getExplanation(**kwargs)
+ if explanation is not None:
+ specialise = explanation.getSpecialiseValue()
+ business_process_type_list = self.getPortalBusinessProcessTypeList()
+ # because trade condition can be specialised
+ while specialise is not None and \
+ specialise.getPortalType() not in business_process_type_list:
+ specialise = specialise.getSpecialiseValue()
+ return specialise
+
+ security.declareProtected(Permissions.View, 'getRootExplanation')
+ def getRootExplanation(self, business_process):
+ """
+ the method of ProductionOrderRule returns most tail path of business process
+ """
+ if business_process is not None:
+ for business_path in business_process.contentValues(
+ portal_type=self.getPortalBusinessPathTypeList()):
+ if business_path.isDeliverable():
+ return business_path
+
+ security.declareProtected(Permissions.View, 'getExplanation')
+ def getExplanation(self, movement=None, applied_rule=None):
+ if applied_rule is not None:
+ return applied_rule.getRootAppliedRule().getCausalityValue()
+ else:
+ return movement.getRootSimulationMovement()\
+ .getOrderValue().getExplanationValue()
+
+
+class TransformationRule(TransformationRuleMixin, Rule):
+ """
+ """
+
+ # CMF Type Definition
+ meta_type = 'ERP5 Transformation Rule'
+ portal_type = 'Transformation Rule'
+ # Declarative security
+ security = ClassSecurityInfo()
+ security.declareObjectProtected(Permissions.AccessContentsInformation)
+
+ __implements__ = ( Interface.Predicate,
+ Interface.Rule )
+ # Default Properties
+ property_sheets = ( PropertySheet.Base
, PropertySheet.XMLObject
, PropertySheet.CategoryCore
, PropertySheet.DublinCore
, PropertySheet.Task
)
- # Class variable
- simulation_movement_portal_type = "Simulation Movement"
-
- # Simulation workflow
- security.declareProtected(Permissions.ModifyPortalContent, 'expand')
- def expand(self, applied_rule, **kw):
+
+ def getHeadProductionPathList(self, transformation, business_process):
+ """
+ Return list of path which is head of transformation trade_phases
+
+ this method assumes trade_phase of head paths is only one
+ """
+ production_trade_phase_set = set([amount.getTradePhase()
+ for amount in transformation\
+ .objectValues(portal_type='Transformation Transformed Resource')])
+ head_path_list = []
+ for state in business_process.objectValues(
+ portal_type=self.getPortalBusinessStateTypeList()):
+ if len(state.getSuccessorRelatedValueList()) == 0:
+ head_path_list.extend(state.getPredecessorRelatedValueList())
+
+ result_list = []
+ for path in head_path_list:
+ result_list += self._getHeadPathByTradePhaseList(path, production_trade_phase_set)
+
+ return map(lambda t: t[0], filter(lambda t: t != (None, None), result_list))
+
+ def _getHeadPathByTradePhaseList(self, path, trade_phase_set):
+ _set = set(path.getTradePhaseList())
+ if _set & trade_phase_set:
+ return [(path, _set & trade_phase_set)]
+
+ successor_node = path.getSuccessorValue()
+ if successor_node is None:
+ return [(None, None)]
+
+ _list = []
+ for next_path in successor_node.getPredecessorRelatedValueList():
+ _list += self._getHeadPathByTradePhaseList(next_path, trade_phase_set)
+ return _list
+
+ security.declareProtected(Permissions.ModifyPortalContent, 'expand')
+ def expand(self, applied_rule, **kw):
+ """
+ """
+
+ parent_movement = applied_rule.getParentValue()
+
+ transformation = self.getTransformation(movement=parent_movement)
+ business_process = self.getBusinessProcess(movement=parent_movement)
+ explanation = self.getExplanation(movement=parent_movement)
+
+ # get all trade_phase of the Business Process
+ trade_phase_list = business_process.getTradePhaseList()
+
+ # get head of production path from business process with trade_phase_list
+ head_production_path_list = self.getHeadProductionPathList(transformation,
+ business_process)
+
+ product_resource = transformation.getResource()
+ product_quantity = parent_movement.getNetQuantity()
+ product_quantity_unit = parent_movement.getQuantityUnit()
+ product_variation_category_list = parent_movement.getVariationCategoryList()
+ product_variation_property_dict = parent_movement.getVariationPropertyDict()
+
+ amount_dict = {}
+ # XXX Transformation.getAggregatedAmountList is useless, it can not have trade_phase, because Amout.
+ for amount in transformation.objectValues(portal_type='Transformation Transformed Resource'):
+ phase = amount.getTradePhase()
+ amount_dict.setdefault(phase, [])
+ amount_dict[phase].append(amount)
+
+ product_destination = None
+ for (phase, amount_list) in amount_dict.items():
+ if phase not in trade_phase_list:
+ raise TransformationRuleError,\
+ "Trade phase %r is not part of Business Process %r" % (phase, business_process)
+
+ phase_path_list = business_process.getPathValueList(phase)
"""
- Expands the current movement downward.
- -> new status -> expanded
- An applied rule can be expanded only if its parent movement
- is expanded.
+ XXX: In this context, we assume quantity as ratio,
+ but this notion is consistent with transformation.
"""
- parent_movement = applied_rule.getParentValue()
- # Get production node and production section
- production = parent_movement.getSource()
- production_section = parent_movement.getSourceSection()
- # Get the current supply link used to calculate consumed resource
- # The current supply link is calculated from the parent AppliedRule.
- supply_chain = self.getSupplyChain(parent_movement.getParentValue())
- parent_supply_link = self.getCurrentSupplyLink(parent_movement)
- current_supply_link_list = supply_chain.\
- getPreviousProductionSupplyLinkList(parent_supply_link)
- if len(current_supply_link_list) != 1:
- # We shall no pass here.
- # The test method returned a wrong value !
+ if sum(map(lambda path: path.getQuantity(), phase_path_list)) != 1:
raise TransformationRuleError,\
- "Expand must not be called on %r" %\
- applied_rule.getRelativeUrl()
- else:
- current_supply_link = current_supply_link_list[0]
- # Generate produced movement
- movement_dict = self._expandProducedResource(applied_rule,
- production,
- production_section,
- current_supply_link)
- # Generate consumed movement
- consumed_mvt_dict = self._expandConsumedResource(applied_rule,
- production,
- production_section,
- current_supply_link)
- movement_dict.update(consumed_mvt_dict)
- # Finally, build movement
- self._buildMovementList(applied_rule, movement_dict, **kw)
- # Expand each movement created
- Rule.expand(self, applied_rule, **kw)
-
- def _expandProducedResource(self, applied_rule, production,
- production_section, current_supply_link):
- """
- Produced resource.
- Create a movement for the resource produced by the transformation.
- Only one produced movement can be created.
- """
- parent_movement = applied_rule.getParentValue()
- stop_date = parent_movement.getStartDate()
- produced_movement_dict = {
- 'pr': {
- "resource": parent_movement.getResource(),
- # XXX what is lost quantity ?
- "quantity": parent_movement.getQuantity(),# + lost_quantity,
- "quantity_unit": parent_movement.getQuantityUnit(),
- "variation_category_list":\
- parent_movement.getVariationCategoryList(),
- "variation_property_dict": \
- parent_movement.getVariationPropertyDict(),
- "source_list": (),
- "source_section_list": (),
- "destination": production,
- "destination_section": production_section,
- "deliverable": 1,
- 'start_date': current_supply_link.calculateStartDate(stop_date),
- 'stop_date': stop_date,
- 'causality_value': current_supply_link,
- }
- }
- return produced_movement_dict
-
- def _expandConsumedResource(self, applied_rule, production,
- production_section, current_supply_link):
- """
- Consumed resource.
- Create a movement for each resource consumed by the transformation,
- and for the previous variation of the produced resource.
- """
- # Calculate all consumed resource
- # Store each value in a dictionnary before created them.
- # { movement_id: {property_name: property_value,} ,}
- consumed_movement_dict = {}
- parent_movement = applied_rule.getParentValue()
- supply_chain = self.getSupplyChain(parent_movement.getParentValue())
- # Consumed previous variation
- previous_variation_dict = self._expandConsumedPreviousVariation(
- applied_rule,
- production,
- production_section,
- supply_chain,
- current_supply_link)
- consumed_movement_dict.update(previous_variation_dict)
- # Consumed raw materials
- raw_material_dict = self._expandConsumedRawMaterials(
- applied_rule,
- production,
- production_section,
- supply_chain,
- current_supply_link)
- consumed_movement_dict.update(raw_material_dict)
- return consumed_movement_dict
-
- def _expandConsumedPreviousVariation(self, applied_rule, production,
- production_section, supply_chain,
- current_supply_link):
- """
- Create a movement for the previous variation of the produced resource.
- """
- id_count = 1
- consumed_movement_dict = {}
- parent_movement = applied_rule.getParentValue()
- # Calculate the variation category list of parent movement
- base_category_list = parent_movement.getVariationBaseCategoryList()
- if "industrial_phase" in base_category_list:
- # We do not want to get the industrial phase variation
- base_category_list.remove("industrial_phase")
- category_list = parent_movement.getVariationCategoryList(
- base_category_list=base_category_list)
- # Calculate the previous variation
- for previous_supply_link in supply_chain.\
- getPreviousSupplyLinkList(current_supply_link):
- previous_ind_phase_list = supply_chain.\
- getPreviousProductionIndustrialPhaseList(previous_supply_link,
- all=1)
- if previous_ind_phase_list != []:
- # Industrial phase is a category
- ind_phase_list = [x.getRelativeUrl() for x in \
- previous_ind_phase_list]
- consumed_mvt_id = "%s_%s" % ("mr", id_count)
- id_count += 1
- stop_date = parent_movement.getStartDate()
- consumed_movement_dict[consumed_mvt_id] = {
- 'start_date': current_supply_link.calculateStartDate(stop_date),
- 'stop_date': stop_date,
- "resource": parent_movement.getResource(),
- # XXX Is the quantity value correct ?
- "quantity": parent_movement.getNetQuantity(), # getNetQuantity to support efficency from transformation
- "quantity_unit": parent_movement.getQuantityUnit(),
- "destination_list": (),
- "destination_section_list": (),
- "source": production,
- "source_section": production_section,
- "deliverable": 1,
- "variation_category_list": category_list+ind_phase_list,
- "variation_property_dict": \
- parent_movement.getVariationPropertyDict(),
- 'causality_value': current_supply_link,
- }
- return consumed_movement_dict
-
- def _expandConsumedRawMaterials(self, applied_rule, production,
- production_section, supply_chain,
- current_supply_link):
- """
- Create a movement for each resource consumed by the transformation,
- """
- parent_movement = applied_rule.getParentValue()
- # Calculate the context for getAggregatedAmountList
- base_category_list = parent_movement.getVariationBaseCategoryList()
- if "industrial_phase" in base_category_list:
- # We do not want to get the industrial phase variation
- base_category_list.remove("industrial_phase")
- category_list = parent_movement.getVariationCategoryList(
- base_category_list=base_category_list)
- # Get the transformation to use
- transformation = self.getTransformation(applied_rule)
- # Generate the fake context
- tmp_context = parent_movement.asContext(
- context=parent_movement,
- REQUEST={'categories':category_list})
- # Calculate the industrial phase list
- previous_ind_phase_list = supply_chain.\
- getPreviousPackingListIndustrialPhaseList(current_supply_link)
- ind_phase_id_list = [x.getRelativeUrl() for x in previous_ind_phase_list]
- # Call getAggregatedAmountList
- # XXX expand failed if transformation is not defined.
- # Do we need to catch the exception ?
- amount_list = transformation.getAggregatedAmountList(
- tmp_context,
- ind_phase_url_list=ind_phase_id_list)
- # Add entries in the consumed_movement_dict
- consumed_movement_dict = {}
- for amount in amount_list:
- consumed_mvt_id = "%s_%s" % ("cr", amount.getId())
- stop_date = parent_movement.getStartDate()
- resource_price = amount.getResourcePrice()
- price = None
- if resource_price is not None:
- price = amount.getNetQuantity() * resource_price # getNetQuantity to support efficency from transformation
- consumed_movement_dict[consumed_mvt_id] = {
- 'start_date': current_supply_link.calculateStartDate(stop_date),
- 'stop_date': stop_date,
- "resource": amount.getResource(),
- "variation_category_list":\
- amount.getVariationCategoryList(),
- "variation_property_dict": \
- amount.getVariationPropertyDict(),
- "quantity": amount.getNetQuantity() * parent_movement.getQuantity(), # getNetQuantity to support efficency from transformation
- "price": price,
- "quantity_unit": amount.getQuantityUnit(),
- "destination_list": (),
- "destination_section_list": (),
- "source": production,
- "source_section": production_section,
- "deliverable": 1,
- 'causality_value': current_supply_link,
- }
- return consumed_movement_dict
-
- security.declareProtected(Permissions.ModifyPortalContent, 'solve')
- def solve(self, applied_rule, solution_list):
- """
- Solve inconsistency according to a certain number of solutions
- templates. This updates the
-
- -> new status -> solved
-
- This applies a solution to an applied rule. Once
- the solution is applied, the parent movement is checked.
- If it does not diverge, the rule is reexpanded. If not,
- diverge is called on the parent movement.
- """
-
- security.declareProtected(Permissions.ModifyPortalContent, 'diverge')
- def diverge(self, applied_rule):
- """
- -> new status -> diverged
-
- This basically sets the rule to "diverged"
- and blocks expansion process
- """
-
-# # Solvers
-# security.declareProtected(Permissions.View, 'isDivergent')
-# def isDivergent(self, applied_rule):
-# """
-# Returns 1 if divergent rule
-# """
-#
-# security.declareProtected(Permissions.View, 'getDivergenceList')
-# def getDivergenceList(self, applied_rule):
-# """
-# Returns a list Divergence descriptors
-# """
-#
-# security.declareProtected(Permissions.View, 'getSolverList')
-# def getSolverList(self, applied_rule):
-# """
-# Returns a list Divergence solvers
-# """
-
- # Deliverability / orderability
- def isDeliverable(self, m):
- return 1
- def isOrderable(self, m):
- return 0
-
+ "sum ratio of Trade Phase %r of Business Process %r is not one" % (phase, business_process)
+
+ for path in phase_path_list:
+ start_date = path.getExpectedStartDate(explanation)
+ stop_date = path.getExpectedStopDate(explanation)
+ predecessor_remaining_phase_list = path.getPredecessorValue()\
+ .getRemainingTradePhaseList(explanation,
+ trade_phase_list=trade_phase_list)
+ successor_remaining_phase_list = path.getSuccessorValue()\
+ .getRemainingTradePhaseList(explanation,
+ trade_phase_list=trade_phase_list)
+ if len(successor_remaining_trade_phase_list) == 0:
+ """
+ Destinations of last paths for transformation must be same,
+ because paths for transformation must be integrated finally,
+
+ valid graph
+ a --
+ \--
+ X- b
+ /--
+ c --
+
+ invalid graph
+ a ------- b
+
+ c ------- d
+ """
+ if product_destination is None:
+ product_destination = path.getDestination()
+ if product_destination != path.getDestination():
+ raise TransformationRuleError,\
+ "Transformation %r is not integrated on Business Process %r" % (transformation, business_process)
+ else:
+ # partial product movement
+ movement = applied_rule.newContent(portal_type="Simulation Movement")
+ movement.edit(causality_value=path,
+ start_date=path.getExpectedStartDate(explanation),
+ stop_date=path.getExpectedStopDate(explanation),
+ resource=product_resource,
+ quantity=-(product_quantity * path.getQuantity()),
+ quantity_unit=product_quantity_unit,
+ variation_category_list=product_variation_category_list,
+ variation_property_dict=product_variation_property_dict,
+ destination=path.getDestination(),
+ #destination_section=???,
+ trade_phase_value_list=successor_remaining_trade_phase_list)
+
+ # when the path is part of production but not first, consume previous partial product
+ if path not in head_production_path_list:
+ # consumed partial product movement
+ movement = applied_rule.newContent(portal_type="Simulation Movement")
+ movement.edit(causality_value=path,
+ start_date=start_date,
+ stop_date=stop_date,
+ resource=product_resource,
+ quantity=product_quantity * path.getQuantity(),
+ quantity_unit=product_quantity_unit,
+ variation_category_list=product_variation_category_list,
+ variation_property_dict=product_variation_property_dict,
+ source=path.getSource(),
+ #source_section=???,
+ trade_phase_value_list=predecessor_remaining_trade_phase_list)
+
+ # consumption movement
+ for amount in amount_list:
+ consumed_resource = amount.getResource()
+ consumed_quantity = product_quantity * amount.getQuantity() / amount.getEfficiency()
+ consumed_quantity_unit = amount.getQuantityUnit()
+
+ # consume resource
+ movement = applied_rule.newContent(portal_type="Simulation Movement")
+ movement.edit(causality_value=path,
+ start_date=start_date,
+ stop_date=stop_date,
+ resource=consumed_resource,
+ quantity=consumed_quantity * path.getQuantity(),
+ quantity_unit=consumed_quantity_unit,
+ source=path.getSource(),
+ #source_section=???,
+ trade_phase=path.getTradePhase())
+
+ # product movement
+ movement = applied_rule.newContent(portal_type="Simulation Movement")
+ movement.edit(start_date=path.getExpectedStartDate(explanation),
+ stop_date=path.getExpectedStopDate(explanation),
+ resource=product_resource,
+ quantity=-(product_quantity),
+ quantity_unit=product_quantity_unit,
+ variation_category_list=product_variation_category_list,
+ variation_property_dict=product_variation_property_dict,
+ destination=product_destination,
+ #destination_section=???,
+ )
+
+ Rule.expand(self, applied_rule, **kw)
Modified: erp5/trunk/products/ERP5/Interface/IBusinessProcess.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5/Interface/IBusinessProcess.py?rev=27271&r1=27270&r2=27271&view=diff
==============================================================================
--- erp5/trunk/products/ERP5/Interface/IBusinessProcess.py [utf8] (original)
+++ erp5/trunk/products/ERP5/Interface/IBusinessProcess.py [utf8] Fri May 29 18:37:29 2009
@@ -85,3 +85,7 @@
'explanation' is the Order or Item or Document which is the
cause of a root applied rule in the simulation
"""
+
+ def getTradePhaseList(self):
+ """Returns list of all trade_phase of this Business Process
+ """
Added: erp5/trunk/products/ERP5/tests/testMRP.py
URL: http://svn.erp5.org/erp5/trunk/products/ERP5/tests/testMRP.py?rev=27271&view=auto
==============================================================================
--- erp5/trunk/products/ERP5/tests/testMRP.py (added)
+++ erp5/trunk/products/ERP5/tests/testMRP.py [utf8] Fri May 29 18:37:29 2009
@@ -1,0 +1,284 @@
+# -*- coding: utf-8 -*-
+##############################################################################
+# Copyright (c) 2009 Nexedi SA and Contributors. All Rights Reserved.
+# Yusuke Muraoka <yusuke at nexedi.com>
+#
+# WARNING: This program as such is intended to be used by professional
+# programmers who take the whole responsibility of assessing all potential
+# consequences resulting from its eventual inadequacies and bugs
+# End users who are looking for a ready-to-use solution with commercial
+# guarantees and support are strongly adviced to contract a Free Software
+# Service Company
+#
+# This program is Free Software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+##############################################################################
+
+import unittest
+import transaction
+
+from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
+from AccessControl.SecurityManagement import newSecurityManager
+from DateTime import DateTime
+
+from Products.ERP5Type.tests.Sequence import SequenceList
+from Products.CMFCore.utils import getToolByName
+from Products.ERP5Type.tests.utils import reindex
+
+from Products.ERP5.Document.TransformationRule import TransformationRule
+
+from Products.ERP5.tests.testBPMCore import TestBPMMixin
+
+class TestMRPMixin(TestBPMMixin):
+ transformation_portal_type = 'Transformation'
+ transformed_resource_portal_type = 'Transformation Transformed Resource'
+ product_portal_type = 'Product'
+
+ def setUpOnce(self):
+ self.portal = self.getPortalObject()
+
+ def invalidateRules(self):
+ """
+ do reversely of validateRules
+ """
+ rule_tool = self.getRuleTool()
+ for rule in rule_tool.contentValues(
+ portal_type=rule_tool.getPortalRuleTypeList()):
+ rule.invalidate()
+
+ def createTransformation(self):
+ module = self.portal.getDefaultModule(
+ portal_type=self.transformation_portal_type)
+ return module.newContent(portal_type=self.transformation_portal_type)
+
+ def createTransformedResource(self, transformation=None):
+ if transformation is None:
+ transformation = self.createTransformation()
+ return transformation.newContent(
+ portal_type=self.transformed_resource_portal_type)
+
+ @reindex
+ def createCategories(self):
+ category_tool = getToolByName(self.portal, 'portal_categories')
+ self.createCategoriesInCategory(category_tool.base_amount, ['weight'])
+ self.createCategoriesInCategory(category_tool.base_amount.weight, ['kg'])
+ self.createCategoriesInCategory(category_tool.trade_phase, ['mrp',])
+ self.createCategoriesInCategory(category_tool.trade_phase.mrp,
+ ['p' + str(i) for i in range(5)]) # phase0 ~ 4
+
+ def createProduct(self):
+ module = self.portal.getDefaultModule(
+ portal_type=self.product_portal_type)
+ return module.newContent(portal_type=self.product_portal_type)
+
+ @reindex
+ def createDefaultTransformation(self):
+ resource1 = self.createProduct()
+ resource2 = self.createProduct()
+ resource3 = self.createProduct()
+ resource4 = self.createProduct()
+ resource5 = self.createProduct()
+ transformation = self.createTransformation()
+ amount1 = self.createTransformedResource(transformation=transformation)
+ amount2 = self.createTransformedResource(transformation=transformation)
+ amount3 = self.createTransformedResource(transformation=transformation)
+ amount4 = self.createTransformedResource(transformation=transformation)
+
+ resource1.edit(title='product', quantity_unit_list=['weight/kg'])
+ resource2.edit(title='triangle', quantity_unit_list=['weight/kg'])
+ resource3.edit(title='box', quantity_unit_list=['weight/kg'])
+ resource4.edit(title='circle', quantity_unit_list=['weight/kg'])
+ resource5.edit(title='banana', quantity_unit_list=['weight/kg'])
+
+ transformation.edit(resource_value=resource1)
+ amount1.edit(resource_value=resource2, quantity=3,
+ quantity_unit_list=['weight/kg'], trade_phase='mrp/p2')
+ amount2.edit(resource_value=resource3, quantity=1,
+ quantity_unit_list=['weight/kg'], trade_phase='mrp/p2')
+ amount3.edit(resource_value=resource4, quantity=4,
+ quantity_unit_list=['weight/kg'], trade_phase='mrp/p3')
+ amount4.edit(resource_value=resource5, quantity=1,
+ quantity_unit_list=['weight/kg'], trade_phase='mrp/p3')
+ return transformation
+
+ @reindex
+ def createSimpleBusinessProcess(self):
+ """ mrp/p2 mrp/3
+ ready -------- partial_produced ------- done
+ """
+ business_process = self.createBusinessProcess()
+ business_path_p2 = self.createBusinessPath(business_process)
+ business_path_p3 = self.createBusinessPath(business_process)
+ business_state_ready = self.createBusinessState(business_process)
+ business_state_partial = self.createBusinessState(business_process)
+ business_state_done = self.createBusinessState(business_process)
+
+ business_process.edit(referential_date='stop_date')
+ business_path_p2.edit(id='p2',
+ predecessor_value=business_state_ready,
+ successor_value=business_state_partial,
+ quantity=1,
+ trade_phase=['mrp/p2'])
+ business_path_p3.edit(id='p3',
+ predecessor_value=business_state_partial,
+ successor_value=business_state_done,
+ quantity=1,
+ deliverable=1, # root explanation
+ trade_phase=['mrp/p3'])
+ return business_process
+
+ @reindex
+ def createConcurrentBusinessProcess(self):
+ """ mrp/p2
+ ready ======== partial_produced
+ mrp/p3
+ """
+ business_process = self.createBusinessProcess()
+ business_path_p2 = self.createBusinessPath(business_process)
+ business_path_p3 = self.createBusinessPath(business_process)
+ business_state_ready = self.createBusinessState(business_process)
+ business_state_partial = self.createBusinessState(business_process)
+
+ business_process.edit(referential_date='stop_date')
+ business_path_p2.edit(id='p2',
+ predecessor_value=business_state_ready,
+ successor_value=business_state_partial,
+ quantity=1,
+ trade_phase=['mrp/p2'])
+ business_path_p3.edit(id='p3',
+ predecessor_value=business_state_ready,
+ successor_value=business_state_partial,
+ quantity=1,
+ deliverable=1, # root explanation
+ trade_phase=['mrp/p3'])
+ return business_process
+
+class TestMRPImplementation(TestMRPMixin, ERP5TypeTestCase):
+ """the test for implementation"""
+ def test_TransformationRule_getHeadProductionPathList(self):
+ rule = self.portal.portal_rules.default_transformation_rule
+
+ transformation = self.createDefaultTransformation()
+
+ business_process = self.createSimpleBusinessProcess()
+ self.assertEquals([business_process.p2],
+ rule.getHeadProductionPathList(transformation, business_process))
+
+ business_process = self.createConcurrentBusinessProcess()
+ self.assertEquals(set([business_process.p2, business_process.p3]),
+ set(rule.getHeadProductionPathList(transformation, business_process)))
+
+ def test_TransformationRule_expand(self):
+ transformation = self.createDefaultTransformation()
+
+ """
+ Simple case
+ """
+ business_process = self.createSimpleBusinessProcess()
+
+ # mock order
+ order = self.portal.production_order_module.newContent(portal_type="Production Order")
+ order_line = order.newContent(portal_type="Production Order Line")
+
+ base_date = DateTime()
+ order.edit(specialise_value=business_process,
+ start_date=base_date,
+ stop_date=base_date+3,
+ source_section_value=order,
+ source_value=order)
+ order_line.edit(quantity=10)
+ order_line.setSpecialiseValue(transformation) # XXX Why can not define by edit?
+
+ # don't need another rules, just need TransformationRule for test
+ self.invalidateRules()
+
+ self.stepTic()
+
+ # alter simulations of the order
+ # root
+ applied_rule = self.portal.portal_simulation.newContent(portal_type='Applied Rule')
+ movement = applied_rule.newContent(portal_type='Simulation Movement')
+ applied_rule.edit(causality_value=order)
+ movement.edit(order_value=order_line,
+ quantity=order_line.getQuantity(),
+ resource=transformation.getResource())
+ # test mock
+ applied_rule = movement.newContent(potal_type='Applied Rule')
+
+ rule = self.portal.portal_rules.default_transformation_rule
+ rule.expand(applied_rule)
+
+ # assertion
+ expected_value_set = set([
+ ('business_process_module/1/p2', 'product_module/1', 'mrp/p3', -10),
+ ('business_process_module/1/p2', 'product_module/2', 'mrp/p2', 30),
+ ('business_process_module/1/p2', 'product_module/3', 'mrp/p2', 10),
+ ('business_process_module/1/p3', 'product_module/1', 'mrp/p3', 10),
+ ('business_process_module/1/p3', 'product_module/4', 'mrp/p3', 40),
+ ('business_process_module/1/p3', 'product_module/5', 'mrp/p3', 10),
+ (None, 'product_module/1', None, -10)])
+ movement_list = applied_rule.objectValues()
+ self.assertEquals(len(expected_value_set), len(movement_list))
+ movement_value_set = set([])
+ for movement in movement_list:
+ movement_value_set |= set([(movement.getCausality(),
+ movement.getResource(),
+ movement.getTradePhase(),
+ movement.getQuantity())])
+ self.assertEquals(expected_value_set, movement_value_set)
+
+ """
+ Concurrent case
+ """
+ business_process = self.createConcurrentBusinessProcess()
+ order.edit(specialise_value=business_process)
+
+ self.stepTic()
+
+ # alter simulations of the order
+ # root
+ applied_rule = self.portal.portal_simulation.newContent(portal_type='Applied Rule')
+ movement = applied_rule.newContent(portal_type='Simulation Movement')
+ applied_rule.edit(causality_value=order)
+ movement.edit(order_value=order_line,
+ quantity=order_line.getQuantity(),
+ resource=transformation.getResource())
+ # test mock
+ applied_rule = movement.newContent(potal_type='Applied Rule')
+
+ rule = self.portal.portal_rules.default_transformation_rule
+ rule.expand(applied_rule)
+
+ # assertion
+ expected_value_set = set([
+ ('business_process_module/2/p2', 'product_module/2', 'mrp/p2', 30),
+ ('business_process_module/2/p2', 'product_module/3', 'mrp/p2', 10),
+ ('business_process_module/2/p3', 'product_module/4', 'mrp/p3', 40),
+ ('business_process_module/2/p3', 'product_module/5', 'mrp/p3', 10),
+ (None, 'product_module/1', None, -10)])
+ movement_list = applied_rule.objectValues()
+ self.assertEquals(len(expected_value_set), len(movement_list))
+ movement_value_set = set([])
+ for movement in movement_list:
+ movement_value_set |= set([(movement.getCausality(),
+ movement.getResource(),
+ movement.getTradePhase(),
+ movement.getQuantity())])
+ self.assertEquals(expected_value_set, movement_value_set)
+
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(TestMRPImplementation))
+ return suite
More information about the Erp5-report
mailing list