Source code for moddy.lib.net.eth_unmanaged_switch

"""
:mod:`eth_unmanaged_switch` -- Unmanaged Ethernet Switch
=========================================================

.. module:: eth_unmanaged_switch
   :synopsis: Unmanaged Ethernet Switch
.. moduleauthor:: Klaus Popp <klauspopp@gmx.de>
"""

from moddy import SimPart
from moddy.lib.net.ethernet import eth_bcast_addr, eth_flight_time, eth_hdr_len


[docs]class EthUnmanagedSwitch(SimPart):
[docs] def __init__(self, sim, obj_name, num_ports, net_speed): super().__init__(sim=sim, obj_name=obj_name) self._num_ports = num_ports self._net_speed = net_speed self._lookup_table = {} # key=macAddr, value=NetPort self._net_ports = [] for port_num in range(num_ports): self._net_ports.append(self.NetPort(self, port_num))
[docs] def lookup_mac_addr(self, mac_addr): """ lookup macAddr in lookupTable. :param str macAddr: address to lookup (e.g. '00:11:22:33:44:55') :return: NetPort where macAddr is known. None if not found or if it macAddr is broadcast """ if mac_addr != eth_bcast_addr() and mac_addr in self._lookup_table: return self._lookup_table[mac_addr] else: return None
[docs] def add_mac_to_lookup_table(self, net_port, mac_addr): """ Add macAddr tp lookupTable. Update if it is already in lookup Table :param NetPort netPort: netPort where this macAddr is attached to :param str macAddr: address to addr (e.g. '00:11:22:33:44:55') """ self._lookup_table[mac_addr] = net_port
class NetPort: def __init__(self, switch, port_num): self._switch = switch self._net_speed = switch._net_speed # create network port self._net_port = switch.new_io_port("Port%d" % port_num, None) self._net_port.set_msg_started_func(self.net_port_recv_start) # create a port that simulates the cut-through delay self._cut_through_del_port = switch.new_io_port( "CutThroughDelPort%d" % port_num, self.cut_through_del_port_recv, ) self._cut_through_del_port.loop_bind() def net_port_recv_start(self, in_port, pdu, out_port, flight_time): self._cut_through_del_port.send( pdu, eth_flight_time(self._net_speed, eth_hdr_len()) ) def cut_through_del_port_recv(self, in_port, pdu): dst_addr = pdu["dst"] src_addr = pdu["src"] # Add source addr to lookupTable self._switch.add_mac_to_lookup_table(self, src_addr) # check which port has destination address dst_port = self._switch.lookup_mac_addr(dst_addr) if src_addr != dst_addr: if dst_port is None: # forward to all ports (except my own) for net_port in self._switch._net_ports: if net_port is not self: net_port.send_pdu(pdu) else: # forward to specific port dst_port.send_pdu(pdu) def send_pdu(self, pdu): self._net_port.send( pdu, eth_flight_time(self._net_speed, pdu.byte_len()) )