Serial Gateway


  • Sequential Program Simulation

  • Multitasking

  • Moddy output port send queuing behavior

This demo models a serial gateway between a network port and a serial port. The gateway has a CPU which runs two threads a RxThread and a TxThread, which are scheduled by Moddy’s built-in RTOS scheduler simulation.

The RxThread waits for data from the serial device and forwards it to the network port, which is connected to the Client program.

The TxThread waits for data from the Client and forwards it to the serial port.

2_sergw: A tutorial that models a serial GATEWAY to show the use of moddy

@author: Klaus Popp
# because the filename doesn't conform to snake case style ...
# pylint: disable=C0103

import moddy
from moddy import US, MS

class Gateway(moddy.SimPart):
    ''' Model of the Gateway '''

    def __init__(self, sim):
        # Initialize the parent class
        super().__init__(sim=sim, obj_name="GW")

        # Create a scheduler
        self.sched = moddy.VtSchedRtos(sim=sim, obj_name="sched",

        # Create a Rx and a Tx thread
        self.rx_thread = moddy.VThread(sim=sim, obj_name="RxThr",
                                       elems={'QueuingIn': 'ser_port',
                                              'out': 'net_port'})
        self.tx_thread = moddy.VThread(sim=sim, obj_name="TxThr",
                                       elems={'QueuingIn': 'net_port',
                                              'out': 'ser_port'})

        # add threads to scheduler
        self.sched.add_vthread(self.rx_thread, prio=1)
        self.sched.add_vthread(self.tx_thread, prio=2)

    def rx_thread_task(v_thead: moddy.VThread):
        ''' Gateway receive thread '''
        # note: v_thead is the instance of the vThread
        while True:
            # Wait until serial data available
            if v_thead.ser_port.n_msg() == 0:
                v_thead.wait(timeout=None, ev_list=[v_thead.ser_port])

            # Read serial data. Simulate read from HW Fifo
            # (each message is only one char)
            # Simulate fifo depth of 8 (if more than 8 messages received,
            # Fifo overflow)
            n_chars = v_thead.ser_port.n_msg()

            msg_str = ''
            for _ in range(n_chars):
                msg_str += v_thead.ser_port.read_msg()

            if n_chars > 8:
                v_thead.annotation('FIFO overflow!')
                n_chars = 8
                msg_str = msg_str[:n_chars]

            # Simulate reading from HW Fifo takes time
            # (20us per char, really slow CPU...)
            v_thead.busy(n_chars * 20 * US, 'RFIFO', moddy.BC_WHITE_ON_RED)

            # push data to network
            v_thead.busy(150 * US, 'TXNET', moddy.BC_WHITE_ON_GREEN)

            v_thead.net_port.send(msg_str, 100 * US)

    def tx_thread_task(v_thread: moddy.VThread):
        ''' Gateway transmit thread '''
        # note: v_thread is the instance of the vThread
        while True:
            if v_thread.net_port.n_msg() == 0:
                v_thread.wait(timeout=None, ev_list=[v_thread.net_port])

            v_thread.busy(100 * US, 'RXNET', moddy.BC_WHITE_ON_GREEN)

            # read one message
            msg = v_thread.net_port.read_msg()

            v_thread.busy(len(msg) * 20 * US, 'TXFIFO', moddy.BC_WHITE_ON_RED)

            # push to serial port
            for c in msg:
                v_thread.ser_port.send(c, ser_flight_time(c))

def client_prog(v_thread: moddy.VThread):
    ''' Network CLIENT '''
    # note: v_thread is the instance of the vThread
    while True:
        v_thread.wait(1.2 * MS)
        v_thread.net_port.send('test', 100 * US)
        v_thread.busy(100 * US, 'TX1', moddy.BC_WHITE_ON_BLUE)
        v_thread.net_port.send('test1', 100 * US)
        v_thread.busy(100 * US, 'TX2', moddy.BC_WHITE_ON_RED)
        v_thread.wait(2.3 * MS)
        v_thread.net_port.send('Data1', 100 * US)
        v_thread.busy(100 * US, 'TX3', moddy.BC_WHITE_ON_GREEN)

def ser_dev_prog(v_thread: moddy.VThread):
    ''' Serial Device '''
    # note: v_thread is the instance of the vThread

    # set blue color for messages from SerDev
    while True:
        # Generate some serial output
        v_thread.wait(2 * MS)

        msg_str = 'abc'
        for c in msg_str:
            v_thread.ser_port.send(c, ser_flight_time(c))

        v_thread.wait(1 * MS)

        msg_str = 'Hello-World'
        for c in msg_str:
            v_thread.ser_port.send(c, ser_flight_time(c))

def ser_flight_time(tx_string):
    ''' Compute flight time for tx_string (baudrate=115200) '''
    time_per_char = (1.0 / 115200) * 10
    return time_per_char * len(tx_string)

if __name__ == '__main__':
    SIMU = moddy.Sim()

    CLIENT = moddy.VSimpleProg(sim=SIMU, obj_name="Client",
                               elems={'QueuingIO': 'net_port'})
    SERDEV = moddy.VSimpleProg(sim=SIMU, obj_name="SerDev",
                               elems={'QueuingIO': 'ser_port'})
    GATEWAY = Gateway(SIMU)

    # Bind ports
        ['SerDev.ser_port_out', 'GW.RxThr.ser_port'],
        ['SerDev.ser_port_in', 'GW.TxThr.ser_port'],
        ['Client.net_port_in', 'GW.RxThr.net_port'],
        ['Client.net_port_out', 'GW.TxThr.net_port'],

    # let simulator run
    try: * MS)

    except Exception as exception:
        raise exception
        # create sequence diagram
            show_parts_list=["Client", "GW.RxThr",
                             "GW.TxThr", "SerDev"],
            time_per_div=50 * US,
            title="Serial Gateway Demo")

    # Output model structure graph
    moddy.gen_dot_structure_graph(SIMU, 'output/2_sergw_structure.svg')

The simulation outputs: