Source code for moddy.lib.pdu

"""
Created on 27.04.2018

@author: klauspopp@gmx.de
"""


[docs]def empty_pdu(pdu_type): """ return a empty PDU """ return Pdu(pdu_type, {}, 0)
[docs]class Pdu(dict): """ Representation of a protocol data unit The Pdu is represented as a dictionary, containing the modelled protocol pdu fields. The additional member '_byte_len' represents the real byte length of all pdu fields .. code-block:: dict={ 'm1': m1value, 'm2': m2value }, _byte_len=10 Pdus can be nested. :param str pdu_type: A string describing the type of the Pdu \ (without 'Pdu'), e.g. 'Eth' :param dict mapping: dictionary with member:value pairs, value can be \ a sub-Pdu :param byte_len: top level byte Len - subPdus len will be added """
[docs] def __init__(self, pdu_type, mapping, byte_len): if type(mapping) is Pdu: raise ValueError( "Pdu cannot be initialized with mapping of type Pdu" ) self.pdu_type = pdu_type dict.__init__(self, mapping) # set top level bitLen self._byte_len = byte_len
[docs] @classmethod def is_pdu(cls, obj): """ Check if obj is a Pdu or a subclass of Pdu """ return issubclass(obj.__class__, cls)
[docs] def byte_len(self): """ Return the length of the Pdu in bytes, including all its nested PDUs """ byte_len = self._byte_len for value in self.values(): if Pdu.is_pdu(value): byte_len += value.byte_len() return byte_len
[docs] def fill_up(self, n_bytes): """ Fill top level PDU up, so it has a byte_len of n_bytes if PDU already larger, raise Attribute error """ if self.byte_len() > n_bytes: raise AttributeError( "Pdu is longer than fillup value %d/%d" % (self.byte_len(), n_bytes) ) self._byte_len = n_bytes - self.byte_len() + self._byte_len
def __repr__(self): """ Return a string showing the top level items in a PDU """ s = "%sPdu(%d)" % (self.pdu_type, self.byte_len()) for key, value in self.items(): if s != "": s += " " if Pdu.is_pdu(value): s += ( key + "=" + "%sPdu(%d)" % (value.pdu_type, value.byte_len()) ) else: s += key + "=" + value.__str__() return s
[docs] def dump(self): """ Return a string as __repr()__, but also dump sub-pdus in separated, intended lines, e.g.: .. code-block:: IpPdu(1220) ihl=14 flags=0 src=192.1.1.2 dst=192.1.1.8 payld=RawPdu(1000) payld2=RawPdu(200) payld:RawPdu(1000) raw=IPPAYLOAD payld2:RawPdu(200) raw=AnotherPayld """ return self._dump("", self, 0)
def _dump(self, key, value, level): indent_fmt = "%" + ("%d" % (3 * level)) + "s" indent = indent_fmt % "" s = indent + ("" if key == "" else key + ":") + value.__repr__() + "\n" for key, value in value.items(): if Pdu.is_pdu(value): s += self._dump(key, value, level + 1) return s
[docs] def split_to_fragments(self, pdu_type, max_frag_byte_len): """ Split the Pdu into fragments and return list of fragments Each fragment is represented as a Pdu .. code-block:: Pdu( "<orgtype>Frag", { 'fr': (<offset of fragment>, <len of fragment>), 'org'=[<original Pdu>], fraglen ) Note: the 'org' member is transferred with every fragment. It is enclosed in a list, so that it is not included in the fragment's byte length. The PduDefragmenter class can be used to defragment the fragments :param string pdu_type: pdu_type to set for fragments :param int max_frag_byte_len: maximum bytes per fragment :return list: list of fragments """ frags = [] frag_off = 0 org_pdu_len = self.byte_len() while True: frag_len = org_pdu_len - frag_off if frag_len > max_frag_byte_len: frag_len = max_frag_byte_len frag = Pdu( pdu_type, {"fr": (frag_off, frag_len), "org": [self]}, frag_len ) frags.append(frag) frag_off += frag_len if frag_off >= org_pdu_len: break return frags
[docs]class PduDefragmenter(object): """ Class to defragment a PDU that has been fragmented with split_to_fragments(). Create an instance of this class and add all received fragments with add_fragment(). Fragments may be received in any order. Whether the defragmentation is complete, can be checked by calling the_pdu() or defrag_complete_info(). """
[docs] def __init__(self): self.pdu = None self.frags = [] # list with tuples (off,len) of received fragments
[docs] def add_fragment(self, frag): """ Add a received fragment """ self.frags.append(frag["fr"]) self.pdu = frag["org"][0]
[docs] def the_pdu(self): """ Return the complete defragmented PDU. If fragments are missing, return None. """ if self.defrag_complete_info() is None: return self.pdu else: return None
[docs] def defrag_complete_info(self): """ Check if defragmentation complete. :return string: None if complete or string with info with \ first missing frag """ off = 0 while True: frag = self.has_offs(off) if frag is None: return "missing frag at offs %d" % off off += frag[1] if off >= self.pdu.byte_len(): break return None
def has_offs(self, off): for frag in self.frags: if frag[0] == off: return frag return None