Source code for moddy.fsm_part

:mod:`fsm_part` -- A moddy part with a state machine

.. module:: fsm_part
   :synopsis: A moddy part with a state machine
.. moduleauthor:: Klaus Popp <>

from .sim_part import SimPart

[docs]class SimFsmPart(SimPart): """ A moddy part with a state machine All simulator events are directed to the fsm. :param sim: Simulator instance :param obj_name: part name :param Fsm fsm: the state machine (Fsm class) object created by caller :param parent_obj: parent part. None if part has no parent. Defaults to None :param dict statusBoxReprMap: defines how each state is shown in sequence diagrams' status boxes Must be a dictionary with the state names as keys. The values must be a tuple of * text to display (None if org. state name shall be used) * appearance: The colors of the status box, see setStatIndication() """ def __init__( self, sim, obj_name, fsm, parent_obj=None, status_box_repr_map=None ): # pylint: disable=too-many-arguments super().__init__(sim=sim, obj_name=obj_name, parent_obj=parent_obj) # reference the fsm self.fsm = fsm fsm.the_moddy_part = self # register state change callback fsm.set_state_change_callback(self.fsm_state_change) self._status_box_repr_map = status_box_repr_map def start_sim(self): SimPart.start_sim(self) # Bring state machine into initial state self.fsm.start_fsm()
[docs] def create_ports(self, ptype, list_port_names): # Override simPart method to route all events to central handlers """ Convinience functions to create multiple ports at once. :param ptype: Type of ports, must be one of 'in', 'out' or 'io' :param list list_port_names: list of port names to create The function creates for each port a member variable with this name in the part. """ if ptype == "in": for port_name in list_port_names: setattr( self, port_name, self.new_input_port(port_name, self.msg_recv), ) elif ptype == "io": for port_name in list_port_names: setattr( self, port_name, self.new_io_port(port_name, self.msg_recv) ) else: super().create_ports(ptype, list_port_names)
[docs] def create_timers(self, list_timer_names): # Override simPart method to route all events to central handlers """ Convinience functions to create multiple timers at once. :param list list_timer_names: list of timer names to create The function creates for each port a member variable with this name in the part. """ for tmr_name in list_timer_names: setattr(self, tmr_name, self.new_timer(tmr_name, self.tmr_expired))
def msg_recv(self, port, msg): """ receive function for messages If an event name <portname>_Msg exists in the fsm, trigger that event, otherwise call the state_<portName>_Msg(msg) in the fsm """ fsm_event_name = "%s_msg" % port.obj_name() if self.fsm.has_event(fsm_event_name): self.fsm.event(fsm_event_name) else: if not self.fsm.exec_state_dependent_method( fsm_event_name, True, msg ): self.annotation("%s not handled" % fsm_event_name) def tmr_expired(self, timer): """ Timer expired handler If an event name <timerName>_expired exists in the fsm, trigger that event, otherwise call the state_<timerName>_expired() in the fsm """ fsm_event_name = "%s_expired" % timer.obj_name() if self.fsm.has_event(fsm_event_name): self.fsm.event(fsm_event_name) else: if not self.fsm.exec_state_dependent_method(fsm_event_name, True): self.annotation("%s not handled" % fsm_event_name) def fsm_state_change(self, _, new_state): """ Called by the fsm whenever the state changes. Set the part status indicator to new state using the representation map """ text = new_state appearance = {} if self._status_box_repr_map is not None: try: text, appearance = self._status_box_repr_map[new_state] if text is None: text = new_state except KeyError: pass self.set_state_indicator(text, appearance)