Source code for moddy.fsm

"""
:mod:`fsm` -- Moddy Finite State Machine
=======================================================================

.. module:: fsm
   :synopsis: A general finite state machine with hierarchical state support
.. moduleauthor:: Klaus Popp <klauspopp@gmx.de>

"""


def is_sub_fsm_specification(name_cls_tuple):
    """
    Test if the tuple from a transition list (name, classType)
    is a subFsm specification"""
    _, cls = name_cls_tuple
    if isinstance(cls, type):
        return cls
    return None


[docs]class Fsm: # pylint: disable=too-many-instance-attributes """ A finite state machine. Subclass your FSM from this class. Example:: class Computer(Fsm): def __init__(self): transitions = { '': [('INITIAL', 'off')], 'off': [('PowerApplied', 'standby')], 'standby': [('PowerButtonPressed', 'normal_op')], 'normal_op': [('PowerButtonPressed', 'standby'), ('OsShutdown', 'standby')], 'any': [('PowerRemoved', 'off')] } super().__init__( dictTransitions=transitions ) The special *ANY* state means that the transitions can be initiated from ANY state. The special *INITIAL* event must be in the '' (uninitialized) state and specifies the INITIAL transistion which is triggered by :meth:`.start_fsm`. You can define entry and exit method that are executed when a state is entered or left. These methods must follow the naming convention ``state_<statename>_<entry/exit>`` They don't need to exist. They are called only if they are defined. Note that entry and exit actions are NOT called at self transitions (transitions to the current state):: # Off actions def state_off_entry(self): print("state_off_entry") def state_off_exit(self): print("state_off_exit") You can also define a "do" Method that is invoked * after the "Entry" methode * at self transistions to the state These methods must follow the naming convention ``state_<statename>_do`` Such routines can be also defined for the special *ANY* state. If they exist they are called at the entry or exit or self transitions to/from any state. .. note: You cannot define actions for transitions! Use the fsm as follows:: comp = Computer() comp.start_fsm() # sets the state machine to its initial state comp.event('PowerApplied') print("State %s" % comp.state) comp.event('PowerButtonPressed') print("State %s" % comp.state) comp.event('PowerRemoved') print("State %s" % comp.state) You can call :meth:`.exec_state_dependent_method` to execute a state specific method of the fsm. e.g. ``exec_state_dependent_method('msg', 123)`` calls ``state_<currentStateName>_msg( 123 )`` (e.g. the simFsmPart uses it to execute the _msg and _expiration functions) **Hierarchically Nested State Support** https://en.wikipedia.org/wiki/UML_state_machine#Hierarchically_nested_states Rules: Nested states are defined by the user in the transition list: Main FSM:: transitions = { '': [('INITIAL', 'off')], 'off': [('PowerApplied', 'standby')], 'standby': [('PowerButtonPressed', 'normal_op')], 'normal_op': [ ####### NESTED FSM ('fsm-Name', Class-Name) ('fsm-name' , subfsm), ('PowerButtonPressed', 'standby'), ('OsShutdown', 'standby')], 'any': [('PowerRemoved', 'off')] } * A nested FSM is instantiated when the upper level state is entered * A nested FSM cannot exit * A nested FSM receives all events from the upper level FSM. \ If the event is not known in the nested FSM, \ it is directed to the upper FSM. Events that are known in the \ nested FSM are NOT directed to upper FSM * If the upper state exits, the exit action of the current states \ (first, the state in the nested fsm, \ then the upper fsm) are called. Then the nested fsm is terminated. * Orthogonal nested states are also supported. Meaning, multiple \ nested fsms exist in parallel. Just \ enter multiple subFsms in the transition list of a state. * For nested statemachines, the following methods are usefull: - :meth:`.top_fsm` gives you the reference to the top level Fsm. \ E.g. to fire an event to the top Fsm. - :meth:`moddy_part` gives you the moddy part where the state machine \ is contained, regardless of the fsm nesting level :param dict dict_transitions: a dictionary, with the transitions: The dict key is the state, and the values are a list of transition from that state. Each transition consists of a tuple (event, targetState). :param Fsm parent_fsm: The parent Finite State Machine. None if no parent. """ def __init__(self, dict_transitions, parent_fsm=None): self.state = None self.parent_fsm = parent_fsm self.the_moddy_part = None # set by simFsmPart # set reference to top level Fsm if parent_fsm is None: self._top_fsm = self else: self._top_fsm = parent_fsm._top_fsm self._list_child_fsms = [] # currently ACTIVE children self._dict_transitions = dict_transitions self._state_change_callback = None self._list_events = [] # Validate transitions and build list of events for _, list_trans in dict_transitions.items(): for trans in list_trans: if is_sub_fsm_specification(trans) is None: event, to_state = trans if event not in self._list_events: self._list_events.append(event) if not self.state_exists(to_state): raise RuntimeError( "to_state %s doesn't exist" % to_state ) if to_state == "any": raise RuntimeError("ANY cannot be a target state") # check for initial event and remove it try: idx_initial = self._list_events.index("INITIAL") except ValueError: raise Exception("INITIAL event missing") del self._list_events[idx_initial] # # Public API # def get_dict_transitions(self): """ return the transition dictionary """ return self._dict_transitions
[docs] def exec_state_dependent_method(self, method_name, deep, *args, **kwargs): """ Execute the state specific methods: The method ``self.state_any_<method_name>(*args,**kwargs)`` is called if it exists. The method ``self.state_<stateName>_<method_name>(*args,**kwargs)`` is called if it exists. :param method_name: method name to call :param deep: if True, then for each currently active sub_fsm, the _exec_state_method is called :return: True if at least one method exists """ handled = 0 if deep is True: for sub_fsm in self._list_child_fsms: if sub_fsm.exec_state_dependent_method( method_name, True, *args, **kwargs ): handled += 1 if self._exec_state_method("any", method_name, *args, **kwargs): handled += 1 if self._exec_state_method(self.state, method_name, *args, **kwargs): handled += 1 return handled > 0
[docs] def set_state_change_callback(self, callback): """ Register a method that is called whenever the state of the fsm changes :param callback: function to be called on state changes """ self._state_change_callback = callback
[docs] def has_event(self, ev_name): """ check if the event is known by the fsm or a currently active statemachine. :return: True if event is known by the fsm or a currently active \ statemachine """ ret_val = False if ev_name in self._list_events: ret_val = True else: for sub_fsm in self._list_child_fsms: if sub_fsm.has_event(ev_name): ret_val = True break return ret_val
[docs] def start_fsm(self): """ start the FSM. Fire event ``INITIAL`` """ if self.state is not None: raise RuntimeError("start_fsm state wrong") self._event("INITIAL")
[docs] def event(self, ev_name): """ Execute an Event in the *ANY* and current state. :param ev_name: event to execute :raise AssertionError: if the current state is None. :return: True if the event causes a state change, False if not. """ assert self.state is not None, "Did you call start_fsm?" return self._event(ev_name)
[docs] def top_fsm(self): """ get a reference to the topmost Fsm in the hierarchy """ return self._top_fsm
[docs] def moddy_part(self): """ return a reference of the moddy part this fsm is contained in (regardless of the fsm nesting level). return None if it is not included in a moddy part """ part = None try: part = self.top_fsm().the_moddy_part except AttributeError: pass return part
# # Internal methods # def _exec_state_method(self, state, method_name, *args, **kwargs): """ Execute a state specific method that might exist in the fsm subclass The method self.State_<stateName>_<method_name>(*args) is called if it exists, True is returned. If it doesn't exist, nothing happens, but False is returned. """ full_method_name = "state_%s_%s" % (state, method_name) func = getattr(self, full_method_name, None) if func is None: return False func(*args, **kwargs) return True def state_exists(self, state): """ test if state exists """ return state in self._dict_transitions def goto_state(self, state): """ change fsm state """ if state == "any" or self.state_exists(state) is False: raise RuntimeError("goto_state invalid state %s" % state) if self.state != state: # ignore self transitions old_state = self.state # print("+++ %s GOTO STATE %s" % (type(self).__name__,state)) # exit old state if self.state is not None: # terminate subFsms self.terminate_sub_fsms() # call current state Exit method self.exec_state_dependent_method("exit", False) # enter new state self.state = state self.exec_state_dependent_method("entry", False) # Start any possible nested fsms self.start_sub_fsms() if self._state_change_callback is not None: self._state_change_callback(old_state, self.state) # in any case, execute the "Do" Method of the current state if self.state == state: # only execute this if the state was not again # changed by the Entry methods... self.exec_state_dependent_method("do", False) # print("+++ RETURN FROM %s GOTO STATE %s" % # (type(self).__name__,state)) def _event(self, ev_name): """ Execute an Event in the "ANY" and current state. Returns True if the event causes a state change, False if not. """ # Check if there is a matching transition old_state = self.state # first, check if the current state has subFsms which handle the event # if event handled by subFsm, ignore the event for this fsm if not self.pass_event_to_sub_fsms(ev_name): # Check all transitions in ANY state and current state trans_lists = [] try: trans_lists.append(self._dict_transitions["any"]) except KeyError: # ANY state may not exist pass if self.state is not None: trans_lists.append(self._dict_transitions[self.state]) else: # events in uninitialized state trans_lists.append(self._dict_transitions[""]) # print("+++ %s EVENT %s in state %s" % # (type(self).__name__, ev_name, self.state)) for trans_list in trans_lists: for trans in trans_list: if is_sub_fsm_specification(trans) is None: event, to_state = trans if event == ev_name: # print("+++ %s TRANS %s -> %s" % # (type(self).__name__,self.state, to_state)) self.goto_state(to_state) break return old_state != self.state def pass_event_to_sub_fsms(self, ev_name): """ check if the current state has subFsms which handle the event if event handled by sub_fsm, return True """ handled = False for sub_fsm in self._list_child_fsms: if sub_fsm.has_event(ev_name): sub_fsm.event(ev_name) # print("Event %s handled by sub_fsm %s" % # (ev_name, type(sub_fsm).__name__)) handled = True return handled def start_sub_fsms(self): """ start all subfsms in current master state """ trans_list = self._dict_transitions[self.state] for trans in trans_list: sub_fsm_cls = is_sub_fsm_specification(trans) if sub_fsm_cls is not None: # create new fsm sub_fsm = sub_fsm_cls(parentFsm=self) # add sub_fsm to list of active subFsms self._list_child_fsms.append(sub_fsm) # goto initial state sub_fsm.start_fsm() def terminate_sub_fsms(self): """ terminate all started sub fsms """ for sub_fsm in self._list_child_fsms: sub_fsm.exec_state_dependent_method("exit", False) self._list_child_fsms = []