Remote Controlled ThreadsΒΆ

Covers:

  • How to use a remote controlled thread.

  • Thread execptions and assertions

This demo demonstrates remote controlable vThreads. The MyRCThread is instantiated with the remote_controlled attribute, which means it will have a _thread_control_port, through which it can be started and stopped.

The Stim object starts and kills the MyRCThread.

This demo also shows

  • That messages on input ports get lost while a thread is dead

  • That thread instance variables (e.g. thread_invocation_count) survive a thread restart

  • What happens if a thread calls assertion_failed

  • What happens if a thread throws an Exception

Warning

It is intended that this demo reports

  1. an assertion

  2. an exception

To demonstrate what happens if threads throw exceptions or model assertions

"""
Created on 17.12.2018

@author: klauspopp@gmx.de

Demonstrate remote controlable vThreads

NOTE: It is intended that this demo reports
1) an assertion
2) an exception
To demonstrate what happens if threads throw exceptions or model assertions
"""
import moddy


class MyRcThread(moddy.VThread):
    def __init__(self, sim):
        super().__init__(
            sim=sim,
            obj_name="rcThread",
            parent_obj=None,
            remote_controlled=True,
        )
        self.create_ports("QueingIn", ["from_util_port"])
        self.thread_invocation_count = 0

    def run_vthread(self):
        # variables stored in the simPart object (self) are
        # persistant through thread restarts
        self.thread_invocation_count += 1

        self.annotation("invocation %d" % self.thread_invocation_count)
        self.busy(20, "1", moddy.BC_WHITE_ON_GREEN)

        # This shows that arriving messages are lost while the thread is dead
        for _ in range(20):
            self.wait(2)
            while self.from_util_port.n_msg() > 0:
                self.annotation("Got %s" % self.from_util_port.read_msg())

        # In the 4th invocation generate a model assertion failure
        if self.thread_invocation_count == 4:
            self.assertion_failed("4rd invocation assertion")

        # In the 5th invocation simulate an exception.
        # This terminates the thread and the simulator
        if self.thread_invocation_count == 5:
            raise ValueError("Test what happens in case of thread exceptions")

        self.busy(20, "2", moddy.BC_WHITE_ON_BLUE)


def util_prog(self):
    count = 0
    while True:
        self.busy(10, "1", moddy.BC_WHITE_ON_RED)
        self.to_rc_port.send(count, 1)
        count += 1


def stim_prog(self):

    # @2s: initial start of rcTread
    self.wait_until(2)
    self.rc_port.send("start", 0)

    # @5s: kill rcThread
    self.wait_until(5)
    self.rc_port.send("kill", 0)

    # @7s: restart rcThread
    self.wait_until(7)
    self.rc_port.send("start", 0)

    # @130s: restart rcThread, it has terminated,
    # because it finished its main loop
    self.wait_until(130)
    self.rc_port.send("start", 0)

    # @180s: kill rcThread
    self.wait_until(180)
    self.rc_port.send("kill", 0)

    # @200s: restart rcThread, it has terminated because it has been killed
    self.wait_until(200)
    self.rc_port.send("start", 0)

    # @290s: restart rcThread, it has terminated because
    # it finished its main loop
    self.wait_until(290)
    self.rc_port.send("start", 0)
    self.wait(70)


if __name__ == "__main__":
    SIMU = moddy.Sim()
    SIMU.tracing.set_display_time_unit("s")

    SCHED = moddy.VtSchedRtos(sim=SIMU, obj_name="sched", parent_obj=None)
    rc_thread = MyRcThread(SIMU)
    util_thread = moddy.VThread(
        sim=SIMU,
        obj_name="utilThread",
        target=util_prog,
        elems={"out": "to_rc_port"},
    )
    SCHED.add_vthread(rc_thread, 0)
    SCHED.add_vthread(util_thread, 1)

    STIM = moddy.VSimpleProg(
        sim=SIMU, obj_name="Stim", target=stim_prog, elems={"out": "rc_port"}
    )

    SIMU.smart_bind(
        [
            ["rcThread._thread_control_port", "Stim.rc_port"],
            ["utilThread.to_rc_port", "rcThread.from_util_port"],
        ]
    )

    # let simulator run
    try:
        SIMU.run(stop_time=400, stop_on_assertion_failure=False)

    except Exception:
        raise
    finally:
        # create SVG drawing
        moddy.gen_interactive_sequence_diagram(
            sim=SIMU,
            file_name="output/6_vthread_remote_controlled.html",
            show_parts_list=["utilThread", "rcThread", "Stim"],
            excluded_element_list=["allTimers"],
            title="remote controlled vThreads Demo",
            time_per_div=10,
            pix_per_div=30,
        )

The simulation outputs: