summaryrefslogtreecommitdiff
path: root/test/tuntap
diff options
context:
space:
mode:
Diffstat (limited to 'test/tuntap')
-rw-r--r--test/tuntap/__init__.py21
-rw-r--r--test/tuntap/char_dev_harness.py173
-rw-r--r--test/tuntap/interface_harness.py292
-rw-r--r--test/tuntap/ioctl.py31
-rw-r--r--test/tuntap/packet.py531
-rw-r--r--test/tuntap/packet_codec.py244
-rw-r--r--test/tuntap/packet_reader.py271
-rw-r--r--test/tuntap/route.py112
-rw-r--r--test/tuntap/sockaddr.py124
-rw-r--r--test/tuntap/test_char_dev.py86
-rw-r--r--test/tuntap/test_interface.py120
-rw-r--r--test/tuntap/test_ip.py234
-rw-r--r--test/tuntap/tun_tap_harness.py96
-rw-r--r--test/tuntap/tun_tap_test_case.py40
-rw-r--r--test/tuntap/tuntap_tests.py83
15 files changed, 2458 insertions, 0 deletions
diff --git a/test/tuntap/__init__.py b/test/tuntap/__init__.py
new file mode 100644
index 0000000..664bdaa
--- /dev/null
+++ b/test/tuntap/__init__.py
@@ -0,0 +1,21 @@
+# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
+#
+# Redistribution and use in source and binary forms, with or without modification, are permitted
+# provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice, this list of
+# conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
+# conditions and the following disclaimer in the documentation and/or other materials provided
+# with the distribution.
+# 3. The name of the author may not be used to endorse or promote products derived from this
+# software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
+# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
+# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/test/tuntap/char_dev_harness.py b/test/tuntap/char_dev_harness.py
new file mode 100644
index 0000000..515e50b
--- /dev/null
+++ b/test/tuntap/char_dev_harness.py
@@ -0,0 +1,173 @@
+# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
+#
+# Redistribution and use in source and binary forms, with or without modification, are permitted
+# provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice, this list of
+# conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
+# conditions and the following disclaimer in the documentation and/or other materials provided
+# with the distribution.
+# 3. The name of the author may not be used to endorse or promote products derived from this
+# software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
+# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
+# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import errno
+import fcntl
+import io
+import os
+import struct
+from tuntap import ioctl
+
+class CharDevHarness(object):
+ """
+ Base class for the tun and tap character device harnesses. Manages a single character
+ interface, keeps the file descriptor and handles I/O.
+ """
+
+ _MAX_CHAR_DEV = 16
+ _MAX_PACKET_SIZE = 4096
+
+ def __init__(self, class_name, unit = None):
+ """
+ Initializes the harness.
+
+ Args:
+ class_name: Path name pattern.
+ unit: The character device number.
+ """
+ self._class_name = class_name
+ self._unit = unit
+ self._dev = None
+
+ def _openCharDev(self, unit):
+ """
+ Opens the character device.
+
+ Args:
+ unit: The character device number.
+ """
+ assert not self._dev
+
+ name = self._class_name % unit
+ self._dev = os.open(name, os.O_RDWR)
+
+ def open(self):
+ """
+ Opens the character device.
+ """
+ if self._unit != None:
+ self._openCharDev(self._unit)
+ return
+
+ # Try to open character devices in turn.
+ for i in xrange(0, self._MAX_CHAR_DEV):
+ try:
+ self._openCharDev(i)
+ self._unit = i
+ return
+ except OSError as e:
+ if e.errno != errno.EBUSY:
+ raise e
+
+ # All devices busy.
+ raise OSError(errno.EBUSY)
+
+ def close(self):
+ """
+ Closes the character device.
+ """
+ assert self._dev
+ os.close(self._dev)
+ self._dev = None
+
+ def fileno(self):
+ assert self._dev
+ return self._dev
+
+ def send(self, packet):
+ assert self._dev
+ os.write(self._dev, packet)
+
+ def ioctl(self, cmd, format, arg):
+ """
+ Performs an ioctl on the character device.
+
+ Args:
+ cmd: the ioctl cmd identifier.
+ format: argument format.
+ arg: argument data tuple.
+
+ Returns:
+ Output argument tuple.
+ """
+ assert self._dev
+ return struct.unpack(format, fcntl.ioctl(self._dev, cmd, struct.pack(format, arg)))
+
+ @property
+ def unit(self):
+ """
+ Returns the interface unit, if known.
+ """
+ return self._unit
+
+
+class TunCharDevHarness(CharDevHarness):
+ """
+ Character device harness for tun devices.
+ """
+
+ TUNSIFHEAD = ioctl.IOC(ioctl.OUT, 't', 96, 'i')
+ TUNGIFHEAD = ioctl.IOC(ioctl.IN, 't', 97, 'i')
+
+ def __init__(self, unit = None):
+ """
+ Initializes the harness.
+
+ Args:
+ unit: Character device index
+ """
+ super(TunCharDevHarness, self).__init__('/dev/tun%d', unit = unit)
+
+ @property
+ def prependAF(self):
+ """
+ Gets the AF prepending flag.
+
+ Returns:
+ A flag indicating whether packets on the char dev are prefixed with the AF number.
+ """
+ return self.ioctl(self.TUNGIFHEAD, 'i', (0))[0]
+
+ @prependAF.setter
+ def prependAF(self, prependAF):
+ """
+ Sets the AF prepending flag.
+
+ Args:
+ prependAF: whether the packets on the char dev are prefixed with the AF number.
+ """
+ self.ioctl(self.TUNSIFHEAD, 'i', (prependAF))
+
+
+class TapCharDevHarness(CharDevHarness):
+ """
+ Character device harness for tap devices.
+ """
+
+ def __init__(self, unit = None):
+ """
+ Initializes the harness.
+
+ Args:
+ unit: Character device index
+ """
+ super(TapCharDevHarness, self).__init__('/dev/tap%d', unit = unit)
diff --git a/test/tuntap/interface_harness.py b/test/tuntap/interface_harness.py
new file mode 100644
index 0000000..bbdade2
--- /dev/null
+++ b/test/tuntap/interface_harness.py
@@ -0,0 +1,292 @@
+# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
+#
+# Redistribution and use in source and binary forms, with or without modification, are permitted
+# provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice, this list of
+# conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
+# conditions and the following disclaimer in the documentation and/or other materials provided
+# with the distribution.
+# 3. The name of the author may not be used to endorse or promote products derived from this
+# software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
+# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
+# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import ctypes
+import ctypes.util
+import errno
+import fcntl
+import socket
+import struct
+
+from tuntap import ioctl
+from tuntap.sockaddr import SockaddrDl, SockaddrIn, SockaddrIn6
+
+libc = ctypes.CDLL(ctypes.util.find_library('c'))
+
+class struct_sockaddr(ctypes.Structure):
+ _fields_ = [ ('sa_len', ctypes.c_uint8),
+ ('sa_family', ctypes.c_uint8) ]
+
+class struct_ifaddrs(ctypes.Structure):
+ pass
+
+struct_ifaddrs._fields_ = [ ('ifa_next', ctypes.POINTER(struct_ifaddrs)),
+ ('ifa_name', ctypes.c_char_p),
+ ('ifa_flags', ctypes.c_uint),
+ ('ifa_addr', ctypes.POINTER(struct_sockaddr)),
+ ('ifa_netmask', ctypes.POINTER(struct_sockaddr)),
+ ('ifa_dstaddr', ctypes.POINTER(struct_sockaddr)),
+ ('ifa_data', ctypes.c_void_p) ]
+
+def decodeSockaddr(sockaddr):
+ if not sockaddr:
+ return None
+
+ data = ctypes.string_at(sockaddr, max(sockaddr.contents.sa_len, 16))
+ if sockaddr.contents.sa_family == SockaddrDl.AF_LINK:
+ return SockaddrDl.decode(data)
+ elif sockaddr.contents.sa_family == socket.AF_INET:
+ return SockaddrIn.decode(data)
+ elif sockaddr.contents.sa_family == socket.AF_INET6:
+ return SockaddrIn6.decode(data)
+
+ return None
+
+def getIfAddrs(ifname):
+ ifaddrs = (ctypes.POINTER(struct_ifaddrs))()
+ assert not libc.getifaddrs(ctypes.byref(ifaddrs))
+
+ addrs = []
+ try:
+ entry = ifaddrs
+ while entry:
+ ia = entry.contents
+ entry = ia.ifa_next
+ if ia.ifa_name != ifname:
+ continue
+
+ addrs.append((decodeSockaddr(ia.ifa_addr),
+ decodeSockaddr(ia.ifa_netmask),
+ decodeSockaddr(ia.ifa_dstaddr)))
+ return addrs
+ finally:
+ libc.freeifaddrs(ifaddrs)
+
+
+def ifNameToIndex(ifname):
+ libc.if_nametoindex.restype = ctypes.c_uint
+ index = libc.if_nametoindex(ifname)
+ if not index:
+ raise OSError(ctypes.get_errno)
+ return index
+
+
+class Address(object):
+ """
+ Wraps address parameters for an interface.
+ """
+
+ def __init__(self, af, local, remote, dst, mask):
+ self.af = af
+ self.local = local
+ self.remote = remote
+ self.dst = dst
+ self.mask = mask
+
+ def __makeSaProperty(name):
+ def get(self):
+ addrmap = { socket.AF_INET: SockaddrIn,
+ socket.AF_INET6: SockaddrIn6 }
+ addr = getattr(self, name)
+ if self.af not in addrmap:
+ return None
+ if addr == None:
+ return addrmap[self.af](af = 0, addr = None)
+ return addrmap[self.af](af = self.af, addr = addr)
+
+ return property(get)
+
+ sa_local = __makeSaProperty('local')
+ sa_remote = __makeSaProperty('remote')
+ sa_dst = __makeSaProperty('dst')
+ sa_mask = __makeSaProperty('mask')
+
+
+class InterfaceHarness(object):
+ """
+ Base class for network interface harnesses. Provides helpers to configure the interface.
+ """
+
+ SIOCSIFFLAGS = ioctl.IOC(ioctl.OUT, 'i', 16, '16s16s')
+ SIOCGIFFLAGS = ioctl.IOC(ioctl.INOUT, 'i', 17, '16s16s')
+
+ SIOCAIFADDR = ioctl.IOC(ioctl.OUT, 'i', 26, '16s16s16s16s')
+ SIOCAIFADDR_IN6 = ioctl.IOC(ioctl.OUT, 'i', 26, '16s28s28s28sIiiII')
+ SIOCSIFLLADDR = ioctl.IOC(ioctl.OUT, 'i', 60, '16s16s')
+
+ SIOCGIFMTU = ioctl.IOC(ioctl.INOUT, 'i', 51, '16s16s')
+
+ IFF_UP = 0x1
+ IFF_BROADCAST = 0x2
+ IFF_DEBUG = 0x4
+ IFF_LOOPBACK = 0x8
+ IFF_POINTOPOINT = 0x10
+ IFF_NOTRAILERS = 0x20
+ IFF_RUNNING = 0x40
+ IFF_NOARP = 0x80
+ IFF_PROMISC = 0x100
+ IFF_ALLMULTI = 0x200
+ IFF_OACTIVE = 0x400
+ IFF_SIMPLEX = 0x800
+ IFF_LINK0 = 0x1000
+ IFF_LINK1 = 0x2000
+ IFF_LINK2 = 0x4000
+ IFF_MULTICAST = 0x8000
+
+ def __init__(self, class_name, unit):
+ """
+ Initializes the harness.
+
+ Args:
+ class_name: Interface class name.
+ unit: The interface number.
+ """
+ self._class_name = class_name
+ self._unit = unit
+
+ def _ioctl(self, af, cmd, format, arg):
+ """
+ Performs a socket ioctl.
+
+ Args:
+ af: address family.
+ cmd: the ioctl cmd.
+ format: argument format description.
+ arg: argument data tuple.
+
+ Returns:
+ Output data tuple.
+ """
+ s = socket.socket(af, socket.SOCK_DGRAM)
+ try:
+ return struct.unpack(format, fcntl.ioctl(s, cmd, struct.pack(format, *arg)))
+ finally:
+ s.close()
+
+ @property
+ def flags(self):
+ """
+ Retrieves the interface flags.
+
+ Returns:
+ The interface flags.
+ """
+ return self._ioctl(socket.AF_INET, InterfaceHarness.SIOCGIFFLAGS,
+ '16sH', (self.name, 0))[1]
+
+ @flags.setter
+ def flags(self, flags):
+ """
+ Sets new interface flags.
+
+ Args:
+ flags: new interface flags.
+ """
+ self._ioctl(socket.AF_INET, InterfaceHarness.SIOCSIFFLAGS,
+ '16sH', (self.name, flags))
+
+ @property
+ def mtu(self):
+ """
+ Retrieves the interface MTU.
+
+ Returns:
+ The interface MTU.
+ """
+ return self._ioctl(socket.AF_INET, InterfaceHarness.SIOCGIFMTU,
+ '16si', (self.name, 0))[1]
+
+ @property
+ def name(self):
+ """
+ Gets the interface name.
+
+ Returns:
+ Full interface name.
+ """
+ return "%s%d" % (self._class_name, self._unit)
+
+ @property
+ def index(self):
+ """
+ Gets the interface index.
+
+ Returns:
+ Interface index.
+ """
+ return ifNameToIndex(self.name)
+
+ def getAddrs(self, af = None):
+ def check(addr):
+ if addr and addr.af == af:
+ return addr
+ else:
+ return None
+ return filter(lambda (a, n, d): a != None,
+ map(lambda (a, n, d): (check(a), check(n), check(d)), getIfAddrs(self.name)))
+
+ @property
+ def lladdr(self):
+ entry = self.getAddrs(SockaddrDl.AF_LINK)
+ if entry:
+ return entry[0][0]
+ return None
+
+ @lladdr.setter
+ def lladdr(self, addr):
+ self._ioctl(socket.AF_INET, InterfaceHarness.SIOCSIFLLADDR,
+ '16sBB14s', (self.name, len(addr.addr), addr.af, addr.addr))
+
+ def addIfAddr(self, local, dst, mask):
+ """
+ Set an interface address.
+
+ Args:
+ local: local address.
+ dst: broadcast address or destination address, respectively.
+ mask: the netmask.
+ """
+ self._ioctl(socket.AF_INET, InterfaceHarness.SIOCAIFADDR,
+ '16s16s16s16s', (self.name, local.encode(), dst.encode(), mask.encode()))
+
+ def addIfAddr6(self, local, dst, mask):
+ """
+ Set an INET6 address for the interface.
+
+ Args:
+ local: local address.
+ dst: destination address.
+ mask: the netmask.
+ """
+ # This sometimes fails on Tiger with ENOBUFS. Just retry...
+ ntries = 0
+ while True:
+ try:
+ self._ioctl(socket.AF_INET6, InterfaceHarness.SIOCAIFADDR_IN6,
+ '16s28s28s28sIiiII',
+ (self.name, local.encode(), dst.encode(), mask.encode(),
+ 0, 0, 0, 0xffffffff, 0xffffffff))
+ break
+ except IOError as e:
+ if e.errno != errno.ENOBUFS or ntries > 10:
+ raise e
+ ntries += 1
diff --git a/test/tuntap/ioctl.py b/test/tuntap/ioctl.py
new file mode 100644
index 0000000..641ddf2
--- /dev/null
+++ b/test/tuntap/ioctl.py
@@ -0,0 +1,31 @@
+# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
+#
+# Redistribution and use in source and binary forms, with or without modification, are permitted
+# provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice, this list of
+# conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
+# conditions and the following disclaimer in the documentation and/or other materials provided
+# with the distribution.
+# 3. The name of the author may not be used to endorse or promote products derived from this
+# software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
+# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
+# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import struct
+
+VOID = 0x20000000
+IN = 0x40000000
+OUT = 0x80000000
+INOUT = IN | OUT
+
+def IOC(inout, group, num, format):
+ return inout | ((struct.calcsize(format) & 0x1fff) << 16) | (ord(group) << 8) | num
diff --git a/test/tuntap/packet.py b/test/tuntap/packet.py
new file mode 100644
index 0000000..6fbbb73
--- /dev/null
+++ b/test/tuntap/packet.py
@@ -0,0 +1,531 @@
+# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
+#
+# Redistribution and use in source and binary forms, with or without modification, are permitted
+# provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice, this list of
+# conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
+# conditions and the following disclaimer in the documentation and/or other materials provided
+# with the distribution.
+# 3. The name of the author may not be used to endorse or promote products derived from this
+# software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
+# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
+# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import socket
+
+class BinStruct(object):
+ """
+ Handles packing and unpacking of binary data. It is vaguely inspired by the struct module but
+ taylored for bit-granular fields. Also, it's probably not very fast :)
+ """
+
+ @staticmethod
+ def str2num(data, width):
+ if not data:
+ return 0
+ len, rem = divmod(width, 8)
+ val = 0
+ for i in range(len):
+ val = (val << 8) | ord(data[i])
+ if rem:
+ val = (val << rem) | (ord(data[len]) & ((1 << rem) - 1))
+ return val
+
+ @staticmethod
+ def num2str(val, width):
+ result = bytearray((width + 7) / 8)
+ p, rem = divmod(width, 8)
+ if rem:
+ result[p] = chr(val & ((1 << rem) - 1))
+ val >>= rem
+ while p > 0:
+ p -= 1
+ result[p] = chr(val & 0xff)
+ val >>= 8
+ return str(result)
+
+ def __init__(self, format):
+ """
+ Initializes a BinStruct object that can encode and decode the binary structure specified in
+ the format parameters.
+
+ Args:
+ format: Specifies the format of the binary data. The syntax is
+
+ (<width><type>)*
+
+ where:
+ width is the width of a component in number of bits
+ type is indicates the type of the component and may be one of:
+ s: binary data
+ n: number
+ """
+ self._format = []
+ id = lambda x, width : x or 0
+ typemap = {
+ 'n': (id, id),
+ 's': (BinStruct.num2str, BinStruct.str2num),
+ }
+ pos = 0
+ self._width = 0
+ while pos < len(format):
+ start = pos
+ while str.isdigit(format[pos]):
+ pos += 1
+ width = int(format[start:pos])
+ self._width += width
+ codec = typemap[format[pos]]
+ self._format.insert(0, (width, codec[0], codec[1]))
+ pos += 1
+
+ @property
+ def size(self):
+ return (self._width + 7) / 8
+
+ def pack(self, *values):
+ """
+ Encodes the passed values according to this BinStruct's format definition.
+
+ Args:
+ values: The values to encode.
+ Returns:
+ The encoded struct as a binary string.
+ """
+ assert len(values) == len(self._format)
+ val = 0
+ pos = len(self._format)
+ for value in values:
+ pos -= 1
+ (width, decode, encode) = self._format[pos]
+ val = (val << width) | (encode(value, width) & ((1 << width) - 1))
+ return BinStruct.num2str(val, self._width)
+
+ def unpack(self, data):
+ """
+ Decodes a binary string according to the format definition.
+
+ Args:
+ data: The binary string to decode.
+ Returns:
+ A value tuple.
+ """
+ assert len(data) >= self.size
+ val = BinStruct.str2num(data, self._width)
+ pos = len(self._format)
+ result = [ None for i in range(pos) ]
+ for (width, decode, encode) in self._format:
+ pos -= 1
+ result[pos] = decode(val & ((1 << width) - 1), width)
+ val >>= width
+ return tuple(result)
+
+
+class Packet(object):
+ """
+ Base class for packet encoding and decoding.
+ """
+
+ def __init__(self, format, names, data = None, **initializer):
+ """
+ Initializes the packet.
+
+ Args:
+ format: Binary format description.
+ names: Names for the packet fields.
+ data: Optional binary packet to decode.
+ initializer: Optional initialization values for the packet fields.
+ """
+ self._struct = BinStruct(format)
+ self._names = names
+ self.__dict__.update(dict.fromkeys(self._names, None))
+ self.payload = None
+
+ if isinstance(data, str):
+ self.decode(data)
+ elif isinstance(data, Packet):
+ self.update(data)
+
+ self.__dict__.update(initializer)
+
+ def __repr__(self):
+ return repr(dict(map(lambda x : (x, getattr(self, x)), self._names + ('payload',))))
+
+ def _payloadPos(self):
+ """
+ Returns: The payload position in the data buffer.
+ """
+ return self._struct.size
+
+ def _decodePayload(self, data):
+ """
+ Decodes the payload data.
+
+ Args:
+ data: Payload data buffer.
+ Returns:
+ The payload object.
+ """
+ return data
+
+ def _encodePayload(self):
+ """
+ Encodes the payload data.
+
+ Args:
+ payload: Payload object.
+ Returns:
+ Encoded payload byte string.
+ """
+ if issubclass(self.payload.__class__, Packet):
+ return self.payload.encode()
+ return str(self.payload)
+
+ def _encodeFields(self, *fields):
+ """
+ Takes a fields tuple and returns encoded field data.
+
+ Args:
+ fields: Field values.
+ Returns:
+ Tuple of encoded fields.
+ """
+ return self._struct.pack(*fields)
+
+ def decode(self, data):
+ """
+ Decode a binary packet.
+
+ Args:
+ data: Binary packet data to decode.
+ """
+ fields = self._struct.unpack(data)
+ assert len(fields) == len(self._names)
+ self.__dict__.update(dict(zip(self._names, fields)))
+ self.payload = self._decodePayload(data[self._payloadPos():])
+
+ def update(self, data):
+ """
+ Update the packet from a dictionary.
+
+ Args:
+ data: The dictionary to update from.
+ """
+ self.__dict__.update(map(lambda x : (x, getattr(data, x)), self._names + ('payload',)))
+ if isinstance(self.payload, str):
+ self.payload = self._decodePayload(self.payload)
+
+
+ def encode(self):
+ """
+ Encodes the packet into binary format.
+
+ Returns:
+ The packet data.
+ """
+ fields = map(lambda x : getattr(self, x), self._names)
+ return self._encodeFields(*fields) + self._encodePayload()
+
+ @property
+ def headerLen(self):
+ """
+ The size of the header according to the format.
+
+ Returns:
+ The header length.
+ """
+ return self._struct.size
+
+
+class TunAFFrame(Packet):
+
+ def __init__(self, data = None, **initializer):
+ super(TunAFFrame, self).__init__('32n', ('af',), data, **initializer)
+
+ def _decodePayload(self, data):
+ if self.af == socket.AF_INET:
+ return IPv4Packet(data)
+ elif self.af == socket.AF_INET6:
+ return IPv6Packet(data)
+ return data
+
+
+class EthernetFrame(Packet):
+
+ TYPE_IPV4 = 0x0800
+ TYPE_ARP = 0x0806
+ TYPE_IPV6 = 0x86dd
+
+ def __init__(self, data = None, **initializer):
+ super(EthernetFrame, self).__init__('48s48s16n', ('dst', 'src', 'type'),
+ data, **initializer)
+
+ def _decodePayload(self, data):
+ if self.type == EthernetFrame.TYPE_IPV4:
+ return IPv4Packet(data)
+ elif self.type == EthernetFrame.TYPE_ARP:
+ return ARPPacket(data)
+ elif self.type == EthernetFrame.TYPE_IPV6:
+ return IPv6Packet(data)
+ return data
+
+
+class ARPPacket(Packet):
+
+ HTYPE_ETHERNET = 0x01
+ HLEN_ETHERNET = 6
+ PTYPE_IPV4 = 0x0800
+ PLEN_IPV4 = 4
+ OPER_REQUEST = 1
+ OPER_REPLY = 2
+
+ def __init__(self, data = None, **initializer):
+ super(ARPPacket, self).__init__('16n16n8n8n16n48s32s48s32s',
+ ('htype', 'ptype', 'hlen', 'plen', 'oper',
+ 'sha', 'spa', 'tha', 'tpa'),
+ data, **initializer)
+
+
+class IPv4Packet(Packet):
+
+ PROTO_ICMP = 0x01
+ PROTO_TCP = 0x06
+ PROTO_UDP = 0x11
+
+ class UDPPseudoHeader(Packet):
+
+ def __init__(self, data = None, **initializer):
+ super(IPv4Packet.UDPPseudoHeader, self).__init__('32s32s8s8n16n',
+ ('src', 'dst',
+ 'padding', 'proto', 'length'),
+ data, **initializer)
+
+
+ def __init__(self, data = None, **initializer):
+ super(IPv4Packet, self).__init__('4n4n6n2n16n16n2n14n8n8n16n32s32s',
+ ('version', 'hdrlen', 'dscp', 'ecn',
+ 'len', 'id', 'flags', 'fragoffset',
+ 'ttl', 'proto', 'checksum', 'src', 'dst'),
+ data, **initializer)
+
+ def _payloadPos(self):
+ return self.hdrlen * 4
+
+ def _decodePayload(self, data):
+ if self.proto == IPv4Packet.PROTO_UDP:
+ return UDPPacket(data)
+ return data
+
+ @staticmethod
+ def computeChecksum(data):
+ """
+ Computes the IPv4 header checksum.
+
+ Args:
+ Header in binary.
+ Returns:
+ The header checksum.
+ """
+ sum = 0
+ for i in range(0, len(data) - 1, 2):
+ sum += ord(data[i]) << 8 | ord(data[i + 1])
+ if len(data) % 2 == 1:
+ sum += ord(data[-1]) << 8 | 0
+ return ~((sum & 0xffff) + (sum >> 16))
+
+ def encode(self):
+ payload = self._encodePayload()
+ hdrlen = self.hdrlen or 5
+ payloadlen = self.len or len(payload)
+ fields = [self.version or 4, hdrlen, self.dscp or 0, self.ecn or 0,
+ payloadlen + hdrlen * 4, self.id or 0, self.flags or 0,
+ self.fragoffset or 0, self.ttl or 255, self.proto, self.checksum or 0,
+ self.src, self.dst]
+
+ # Need to compute UDP checksum here since it includes the IPv4 pseudo header.
+ if (self.proto == IPv4Packet.PROTO_UDP and
+ issubclass(self.payload.__class__, UDPPacket) and
+ self.payload.checksum == None):
+
+ header = IPv4Packet.UDPPseudoHeader(src = self.src, dst = self.dst,
+ proto = IPv4Packet.PROTO_UDP, length = payloadlen,
+ payload = payload)
+ payload = UDPPacket(data = self.payload,
+ checksum = IPv4Packet.computeChecksum(header.encode())).encode()
+
+ header = self._encodeFields(*tuple(fields))
+ if self.checksum == None:
+ fields[10] = IPv4Packet.computeChecksum(header)
+ header = self._encodeFields(*tuple(fields))
+ return header + payload
+
+class IPv6Packet(Packet):
+
+ PROTO_ICMP = 1
+ PROTO_TCP = 6
+ PROTO_UDP = 17
+ PROTO_ICMPV6 = 58
+
+ class UDPPseudoHeader(Packet):
+
+ def __init__(self, data = None, **initializer):
+ super(IPv6Packet.UDPPseudoHeader, self).__init__('128s128s32n24s8n',
+ ('src', 'dst',
+ 'length', 'padding', 'proto'),
+ data, **initializer)
+
+
+ def __init__(self, data = None, **initializer):
+ super(IPv6Packet, self).__init__('4n8n20n16n8n8n128s128s',
+ ('version', 'traffic_class', 'flow_label',
+ 'len', 'proto', 'hop_limit',
+ 'src', 'dst'),
+ data, **initializer)
+
+ def _decodePayload(self, data):
+ if self.proto == IPv6Packet.PROTO_UDP:
+ return UDPPacket(data)
+ elif self.proto == IPv6Packet.PROTO_ICMPV6:
+ return ICMPV6Packet(data)
+ return data
+
+ def encode(self):
+ payload = self._encodePayload()
+ fields = [self.version or 6, self.traffic_class or 0, self.flow_label or 0,
+ self.len or len(payload), self.proto, self.hop_limit or 255,
+ self.src, self.dst]
+
+ # Need to compute checksum for UDP, ICMPV6 here since it includes the IPv6 pseudo header.
+ checksummedProtos = { IPv6Packet.PROTO_UDP: UDPPacket,
+ IPv6Packet.PROTO_ICMPV6: ICMPV6Packet }
+ payloadClass = checksummedProtos.get(self.proto)
+ if (payloadClass != None and
+ issubclass(self.payload.__class__, payloadClass) and
+ self.payload.checksum == None):
+
+ header = IPv6Packet.UDPPseudoHeader(src = self.src, dst = self.dst, length = fields[3],
+ proto = self.proto, payload = payload)
+ payload = payloadClass(data = self.payload,
+ checksum = IPv4Packet.computeChecksum(header.encode())).encode()
+
+ return self._encodeFields(*tuple(fields)) + payload
+
+
+class ICMPV6Packet(Packet):
+
+ TYPE_NEIGHBOR_SOLICITATION = 135
+ TYPE_NEIGHBOR_ADVERTISMENT = 136
+
+ def __init__(self, data = None, **initializer):
+ super(ICMPV6Packet, self).__init__('8n8n16n',
+ ('type', 'code', 'checksum'),
+ data, **initializer)
+
+ def _decodePayload(self, data):
+ if self.type == ICMPV6Packet.TYPE_NEIGHBOR_SOLICITATION:
+ return ICMPV6NeighborSolicitation(data)
+ elif self.type == ICMPV6Packet.TYPE_NEIGHBOR_ADVERTISMENT:
+ return ICMPV6NeighborAdvertisement(data)
+ return data
+
+
+class ICMPV6NeighborDiscoveryOption(Packet):
+
+ TYPE_SOURCE_LINK_LAYER_ADDRESS = 1
+ TYPE_TARGET_LINK_LAYER_ADDRESS = 2
+
+ def __init__(self, data = None, **initializer):
+ super(ICMPV6NeighborDiscoveryOption, self).__init__('8n8n',
+ ('type', 'length'),
+ data, **initializer)
+
+ def encode(self):
+ payload = self._encodePayload()
+ length = self.length
+ if length == None:
+ length = (len(payload) + 2 + 7) / 8
+ payload += '\x00' * (length * 8 - len(payload) - 2)
+ fields = [self.type, length]
+ header = self._encodeFields(*tuple(fields))
+ return header + payload
+
+ @staticmethod
+ def decodeOptions(data):
+ options = []
+ while len(data) > 2:
+ type = ord(data[0])
+ length = ord(data[1])
+ if len(data) < length * 8:
+ break
+ options.append(ICMPV6NeighborDiscoveryOption(type = type, length = length,
+ payload = data[0:length * 8]))
+ data = data[length * 8:]
+ return options
+
+
+class ICMPV6NeighborSolicitation(Packet):
+
+ def __init__(self, data = None, **initializer):
+ super(ICMPV6NeighborSolicitation, self).__init__('32s128s',
+ ('reserved', 'target'),
+ data, **initializer)
+ self.target_lladdr = initializer.get('src_lladdr')
+
+ def _decodePayload(self, data):
+ for option in ICMPV6NeighborDiscoveryOption.decodeOptions(data):
+ if option.type == ICMPV6NeighborDiscoveryOption.TYPE_SOURCE_LINK_LAYER_ADDRESS:
+ self.src_lladdr = option.payload
+ return None
+
+ def _encodePayload(self):
+ if self.src_lladdr:
+ return ICMPV6NeighborDiscoveryOption(
+ type = ICMPV6NeighborDiscoveryOption.TYPE_SOURCE_LINK_LAYER_ADDRESS,
+ payload = self.src_lladdr).encode()
+ return ''
+
+
+class ICMPV6NeighborAdvertisement(Packet):
+
+ def __init__(self, data = None, **initializer):
+ super(ICMPV6NeighborAdvertisement, self).__init__('1n1n1n29s128s',
+ ('router', 'solicited', 'override',
+ 'reserved', 'target'),
+ data, **initializer)
+ self.target_lladdr = initializer.get('target_lladdr')
+
+ def _decodePayload(self, data):
+ for option in ICMPV6NeighborDiscoveryOptions.decodeOptions(data):
+ if option.type == ICMPV6NeighborDiscoveryOption.TYPE_TARGET_LINK_LAYER_ADDRESS:
+ self.target_lladdr = option.payload
+ return None
+
+ def _encodePayload(self):
+ if self.target_lladdr:
+ return ICMPV6NeighborDiscoveryOption(
+ type = ICMPV6NeighborDiscoveryOption.TYPE_TARGET_LINK_LAYER_ADDRESS,
+ payload = self.target_lladdr).encode()
+ return ''
+
+
+class UDPPacket(Packet):
+
+ def __init__(self, data = None, **initializer):
+ super(UDPPacket, self).__init__('16n16n16n16n',
+ ('src', 'dst', 'len', 'checksum'),
+ data, **initializer)
+
+ def encode(self):
+ payload = self._encodePayload()
+ packetlen = self.len or (len(payload) + self.headerLen)
+ fields = [self.src, self.dst, packetlen, self.checksum or 0]
+ header = self._encodeFields(*tuple(fields))
+ return header + payload
diff --git a/test/tuntap/packet_codec.py b/test/tuntap/packet_codec.py
new file mode 100644
index 0000000..0909596
--- /dev/null
+++ b/test/tuntap/packet_codec.py
@@ -0,0 +1,244 @@
+# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
+#
+# Redistribution and use in source and binary forms, with or without modification, are permitted
+# provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice, this list of
+# conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
+# conditions and the following disclaimer in the documentation and/or other materials provided
+# with the distribution.
+# 3. The name of the author may not be used to endorse or promote products derived from this
+# software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
+# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
+# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import functools
+import socket
+
+from tuntap.tun_tap_harness import TunHarness, TapHarness
+from tuntap.packet import (
+ ARPPacket,
+ EthernetFrame,
+ ICMPV6Packet,
+ ICMPV6NeighborAdvertisement,
+ ICMPV6NeighborSolicitation,
+ IPv4Packet,
+ IPv6Packet,
+ TunAFFrame,
+ UDPPacket
+)
+from tuntap.packet_reader import PacketReader, SelectPacketSource
+
+class PacketCodec(object):
+ """
+ Helper for tests that wish to send and receive packets. This provides the interface to send and
+ receive packets at the IP/IPv6 level on both the network interface and char dev sides.
+ """
+
+ def __init__(self, af, listenAddress, newHarness, newPacketSource):
+ self._af = af
+ self._listenAddress = listenAddress
+ self._newHarness = newHarness
+ self._newPacketSource = newPacketSource
+
+ def __str__(self):
+ af_map = { socket.AF_INET: 'IN', socket.AF_INET6: 'IN6' }
+ return '<%s<%s, %s>>' % (self.__class__.__name__,
+ af_map[self._af],
+ self._newPacketSource.__name__)
+
+ def _decodePacket(self, packet):
+ return packet
+
+ def _framePacket(self, payload):
+ return payload
+
+ def _frameExpectation(self, expectation):
+ return expectation
+
+ @property
+ def af(self):
+ return self._af
+
+ @property
+ def addr(self):
+ if self._af == socket.AF_INET:
+ return self._harness.addr
+ elif self._af == socket.AF_INET6:
+ return self._harness.addr6
+ assert False
+
+ @property
+ def UDPPort(self):
+ return self._recvSock.getsockname()[1]
+
+ def start(self):
+ self._harness = self._newHarness()
+ self._harness.start()
+ self._harness.up()
+
+ self._sendSock = socket.socket(self.addr.af, socket.SOCK_DGRAM)
+ self._recvSock = socket.socket(self.addr.af, socket.SOCK_DGRAM)
+ self._recvSock.bind((self._listenAddress or self.addr.local, 0))
+
+ self._reader = PacketReader(source = self._newPacketSource(self._harness.char_dev.fileno()),
+ skip = True,
+ decode = lambda packet : self._decodePacket(packet))
+ self._sockReader = PacketReader(source = SelectPacketSource(self._recvSock.fileno()))
+
+ self._reader.start()
+ self._sockReader.start()
+
+ def stop(self):
+ self._sockReader.stop()
+ self._reader.stop()
+ self._harness.stop()
+ self._sendSock.close()
+ self._recvSock.close()
+
+ def sendUDP(self, payload, addr):
+ self._sendSock.sendto(payload, addr)
+
+ def expectUDP(self, expectation):
+ self._sockReader.expect(expectation)
+
+ def runUDP(self):
+ return self._sockReader.run()
+
+ def sendPacket(self, payload):
+ self._harness.char_dev.send(self._framePacket(payload))
+
+ def expectPacket(self, expectation):
+ self._reader.expect(self._frameExpectation(expectation))
+
+ def runPacket(self):
+ return self._reader.run()
+
+
+class TunPacketCodec(PacketCodec):
+
+ def __init__(self, af, listenAddress, newPacketSource):
+ super(TunPacketCodec, self).__init__(af, listenAddress, TunHarness, newPacketSource)
+
+ def _decodePacket(self, packet):
+ # Look at the first byte to figure out whether it's IPv4 or IPv6.
+ version = (ord(packet[0]) & 0xf0) >> 4
+ if version == 4:
+ return IPv4Packet(packet)
+ elif version == 6:
+ return IPv6Packet(packet)
+ else:
+ return packet
+
+
+class TunAFPacketCodec(PacketCodec):
+
+ def __init__(self, af, listenAddress, newPacketSource):
+ super(TunAFPacketCodec, self).__init__(af, listenAddress, TunHarness, newPacketSource)
+
+ def _decodePacket(self, packet):
+ return TunAFFrame(packet)
+
+ def _framePacket(self, payload):
+ return TunAFFrame(af = self.addr.af, payload = payload).encode()
+
+ def _frameExpectation(self, expectation):
+ return { 'af': self.addr.af,
+ 'payload': expectation }
+
+ def start(self):
+ super(TunAFPacketCodec, self).start()
+ self._harness.char_dev.prependAF = 1
+
+
+class TapPacketCodec(PacketCodec):
+
+ TYPE_MAP = { socket.AF_INET: EthernetFrame.TYPE_IPV4,
+ socket.AF_INET6: EthernetFrame.TYPE_IPV6 }
+
+ ETHER_ADDR_ANY = '\xff\xff\xff\xff\xff\xff'
+ ETHER_ADDR_REMOTE = '\x11\x22\x33\x44\x55\x66'
+
+ def __init__(self, af, listenAddress, newPacketSource):
+ super(TapPacketCodec, self).__init__(af, listenAddress, TapHarness, newPacketSource)
+
+ def _decodePacket(self, packet):
+ return EthernetFrame(packet)
+
+ def _framePacket(self, payload):
+ return EthernetFrame(src = TapPacketCodec.ETHER_ADDR_REMOTE,
+ dst = self._harness.interface.lladdr.addr,
+ type = TapPacketCodec.TYPE_MAP[self.addr.af],
+ payload = payload).encode()
+
+ def _frameExpectation(self, expectation):
+ return { 'type': TapPacketCodec.TYPE_MAP[self.addr.af],
+ 'src': self._harness.interface.lladdr.addr,
+ 'payload': expectation }
+
+ def _sendArpReply(self, packet):
+ reply = EthernetFrame(dst = packet.src,
+ src = TapPacketCodec.ETHER_ADDR_ANY,
+ type = EthernetFrame.TYPE_ARP,
+ payload = ARPPacket(htype = ARPPacket.HTYPE_ETHERNET,
+ ptype = ARPPacket.PTYPE_IPV4,
+ hlen = ARPPacket.HLEN_ETHERNET,
+ plen = ARPPacket.PLEN_IPV4,
+ oper = ARPPacket.OPER_REPLY,
+ sha = TapPacketCodec.ETHER_ADDR_REMOTE,
+ spa = packet.payload.tpa,
+ tha = packet.payload.sha,
+ tpa = packet.payload.spa))
+ self._harness.char_dev.send(reply.encode())
+
+ def _sendNeighborAdvertisement(self, packet):
+ reply = EthernetFrame(
+ dst = packet.payload.payload.payload.src_lladdr,
+ src = TapPacketCodec.ETHER_ADDR_ANY,
+ type = EthernetFrame.TYPE_IPV6,
+ payload = IPv6Packet(
+ src = socket.inet_pton(self.addr.af, self.addr.remote),
+ dst = packet.payload.src,
+ proto = IPv6Packet.PROTO_ICMPV6,
+ payload = ICMPV6Packet(
+ type = ICMPV6Packet.TYPE_NEIGHBOR_ADVERTISMENT,
+ payload = ICMPV6NeighborAdvertisement(
+ solicited = 1,
+ override = 1,
+ target = socket.inet_pton(self.addr.af, self.addr.remote),
+ target_lladdr = TapPacketCodec.ETHER_ADDR_REMOTE))))
+ self._harness.char_dev.send(reply.encode())
+
+ def start(self):
+ super(TapPacketCodec, self).start()
+ # Answer ARP resolution requests for the destination address.
+ self._reader.expect(
+ expectation = { 'type': EthernetFrame.TYPE_ARP,
+ 'payload': { 'htype': ARPPacket.HTYPE_ETHERNET,
+ 'ptype': ARPPacket.PTYPE_IPV4,
+ 'hlen': ARPPacket.HLEN_ETHERNET,
+ 'plen': ARPPacket.PLEN_IPV4,
+ 'oper': ARPPacket.OPER_REQUEST,
+ 'tpa': socket.inet_pton(self.addr.af, self.addr.remote) }},
+ times = None,
+ action = functools.partial(TapPacketCodec._sendArpReply, self))
+ # Answer Neighbor Solicitation requests for IPv6.
+ self._reader.expect(
+ expectation = {
+ 'type': EthernetFrame.TYPE_IPV6,
+ 'payload': {
+ 'proto': IPv6Packet.PROTO_ICMPV6,
+ 'payload': {
+ 'type': ICMPV6Packet.TYPE_NEIGHBOR_SOLICITATION,
+ 'payload': {
+ 'target': socket.inet_pton(self.addr.af, self.addr.remote) }}}},
+ times = None,
+ action = functools.partial(TapPacketCodec._sendNeighborAdvertisement, self))
diff --git a/test/tuntap/packet_reader.py b/test/tuntap/packet_reader.py
new file mode 100644
index 0000000..f9b90b6
--- /dev/null
+++ b/test/tuntap/packet_reader.py
@@ -0,0 +1,271 @@
+# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
+#
+# Redistribution and use in source and binary forms, with or without modification, are permitted
+# provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice, this list of
+# conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
+# conditions and the following disclaimer in the documentation and/or other materials provided
+# with the distribution.
+# 3. The name of the author may not be used to endorse or promote products derived from this
+# software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
+# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
+# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import errno
+import os
+import Queue
+import select
+import signal
+import socket
+import pickle
+import threading
+
+MAX_PACKET_SIZE = 4096
+
+def handleEAgain(fn, *args, **kwargs):
+ """
+ Wraps a function call in loop, restarting on EAGAIN.
+ """
+ while True:
+ try:
+ return fn(*args, **kwargs)
+ except EnvironmentError as e:
+ if e.errno != errno.EAGAIN:
+ raise
+ except:
+ raise
+
+
+class BlockingPacketSource(object):
+ """
+ In order to be able to test blocking reads and not hang forever if the expected data never
+ arrives, we do the blocking read call in a forked subprocess that forwards the data read from
+ the fd over a domain socket.
+ """
+
+ def __init__(self, fd):
+ (self._rsock, wsock) = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM)
+ child = os.fork()
+ if child != 0:
+ wsock.close()
+ self._child = child
+ return
+
+ self._rsock.close()
+
+ # This is the read loop in the forked process and it won't quit until either the process
+ # gets killed or there is a read error.
+ try:
+ while True:
+ packet = handleEAgain(os.read, fd, MAX_PACKET_SIZE)
+ handleEAgain(wsock.send, pickle.dumps((0, packet), pickle.HIGHEST_PROTOCOL))
+ if len(packet) == 0:
+ break
+ except KeyboardInterrupt:
+ pass
+ except EnvironmentError as e:
+ print "Packet source recevied error: %d" % e.errno
+ handleEAgain(wsock.send, pickle.dumps((e.errno, ''), pickle.HIGHEST_PROTOCOL))
+ finally:
+ os.close(fd)
+ wsock.close()
+ os._exit(os.EX_OK)
+
+ def read(self, killpipe):
+ (r, w, x) = select.select([self._rsock, killpipe], [], [])
+ if killpipe in r:
+ return None
+ if self._rsock in r:
+ try:
+ return handleEAgain(self._rsock.recv, MAX_PACKET_SIZE)
+ except EnvironmentError as e:
+ # If there's a read error on the subprocess, it'll close the socket.
+ if e.errno != errno.ECONNRESET:
+ raise e
+ return None
+
+ def stop(self):
+ os.kill(self._child, signal.SIGINT)
+ os.waitpid(self._child, 0)
+ self._rsock.close()
+
+
+class SelectPacketSource(object):
+ """
+ Reads data from a file descriptor, waiting for input using select().
+ """
+
+ def __init__(self, fd):
+ self._fd = fd
+
+ def read(self, killpipe):
+ (r, w, x) = select.select([self._fd, killpipe], [], [])
+ if killpipe in r:
+ return None
+ if self._fd in r:
+ packet = handleEAgain(os.read, self._fd, MAX_PACKET_SIZE)
+ return pickle.dumps((0, packet))
+ return None
+
+ def stop(self):
+ pass
+
+class Expectation(object):
+ """
+ Describes an expectation. Expectations are specified as dictionaries to match the packet
+ against. Entries may specify nested dictionaries for recursive matching and callables can be
+ used as predicates. Any other entry will be compared to the corresponding value in the packet.
+ """
+
+ def __init__(self, expectation, times, action):
+ self._expectation = expectation
+ self._times = times
+ self._action = action
+
+ @property
+ def active(self):
+ return self._times == None or self.pending
+
+ @property
+ def pending(self):
+ return self._times != None and self._times > 0
+
+ def check(self, packet):
+ #print 'Matching %s against %s' % (packet, self._expectation)
+ if self.active and Expectation._matches(packet, self._expectation):
+ if self._times:
+ self._times -= 1
+ if callable(self._action):
+ self._action(packet)
+ return True
+ return False
+
+ @staticmethod
+ def _matches(packet, expectation):
+ if isinstance(expectation, dict):
+ for (name, entry) in expectation.iteritems():
+ try:
+ val = getattr(packet, name)
+ except AttributeError:
+ return False
+ if not Expectation._matches(val, entry):
+ return False
+ return True
+ elif callable(expectation):
+ return expectation(packet)
+ else:
+ return packet == expectation
+
+
+class PacketReader(object):
+ """
+ Takes care of reading packets and matching them against expectations.
+ """
+
+ def __init__(self, source, decode = str, skip = False):
+ """
+ Initializes a new reader.
+
+ Args:
+ source: packet source to read packets from.
+ decode: packet decoding function.
+ skip: whether non-matching packets are to be skipped.
+ """
+ self._source = source
+ self._decode = decode
+ self._skip = skip
+ self._expectations = []
+ self._packets = Queue.Queue()
+ self._shutdownPipe = os.pipe()
+ self._stop = threading.Event()
+
+ def start(self):
+ self._readThread = threading.Thread(target = self)
+ self._readThread.start()
+
+ def stop(self):
+ self._stop.set()
+ handleEAgain(os.write, self._shutdownPipe[1], 'stop')
+ self._readThread.join()
+ self._source.stop()
+ os.close(self._shutdownPipe[0])
+ os.close(self._shutdownPipe[1])
+
+ def __call__(self):
+ """
+ Reading service function, runs in a separate thread.
+ """
+ try:
+ while True:
+ packet = handleEAgain(self._source.read, self._shutdownPipe[0])
+ if not packet:
+ self._packets.put((0, ''))
+ break
+ self._packets.put(pickle.loads(packet))
+ except EnvironmentError as e:
+ # The read() is racing against stop(), ignore these situations.
+ if e.errno == EIO and self._stop.isSet():
+ self._packets.put((0, ''))
+ self._packets.put((e.errno, ''))
+
+ def expect(self, expectation, times = 1, action = None):
+ """
+ Adds an expectation for a packet to be received.
+
+ Args:
+ expectation: Dictionary describing the expected packet.
+ times: Number of packets expected. None for unlimited.
+ action: A callback to run after the packet has been received.
+ """
+ assert times != 0
+ self._expectations.append(Expectation(expectation, times, action))
+
+ @property
+ def expectationsPending(self):
+ for e in self._expectations:
+ if e.pending:
+ return True
+ return False
+
+ def run(self, timeout = 1):
+ """
+ Runs the packet reader, waiting for all limited expectations to be met.
+
+ Args:
+ timeout: Wait timeout in seconds.
+ """
+ while self.expectationsPending:
+ try:
+ (code, payload) = self._packets.get(True, timeout)
+ except Queue.Empty:
+ # No packet received.
+ break
+
+ if code != 0:
+ # read error, re-raise.
+ raise OSError((code, os.strerror(code)))
+
+ if len(payload) == 0:
+ # EOF on read.
+ break
+
+ # decode the packet and match it against expectation.
+ matches = False
+ for e in self._expectations:
+ if e.check(self._decode(payload)):
+ matches = True
+ break
+ if not matches and not self._skip:
+ return False
+
+ return not self.expectationsPending
+
diff --git a/test/tuntap/route.py b/test/tuntap/route.py
new file mode 100644
index 0000000..b59707e
--- /dev/null
+++ b/test/tuntap/route.py
@@ -0,0 +1,112 @@
+# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
+#
+# Redistribution and use in source and binary forms, with or without modification, are permitted
+# provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice, this list of
+# conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
+# conditions and the following disclaimer in the documentation and/or other materials provided
+# with the distribution.
+# 3. The name of the author may not be used to endorse or promote products derived from this
+# software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
+# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
+# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import socket
+import struct
+
+# from net/route.h
+RTM_ADD = 0x1 # Add Route
+RTM_DELETE = 0x2 # Delete Route
+RTM_CHANGE = 0x3 # Change Metrics or flags
+RTM_GET = 0x4 # Report Metrics
+RTM_LOSING = 0x5 # Kernel Suspects Partitioning
+RTM_REDIRECT = 0x6 # Told to use different route
+RTM_MISS = 0x7 # Lookup failed on this address
+RTM_LOCK = 0x8 # fix specified metrics
+RTM_OLDADD = 0x9 # caused by SIOCADDRT
+RTM_OLDDEL = 0xa # caused by SIOCDELRT
+RTM_RESOLVE = 0xb # req to resolve dst to LL addr
+RTM_NEWADDR = 0xc # address being added to iface
+RTM_DELADDR = 0xd # address being removed from iface
+RTM_IFINFO = 0xe # iface going up/down etc.
+RTM_NEWMADDR = 0xf # mcast group membership being added to if
+RTM_DELMADDR = 0x10 # mcast group membership being deleted
+
+RTF_UP = 0x1 # route usable
+RTF_GATEWAY = 0x2 # destination is a gateway
+RTF_HOST = 0x4 # host entry (net otherwise)
+RTF_REJECT = 0x8 # host or net unreachable
+RTF_DYNAMIC = 0x10 # created dynamically (by redirect)
+RTF_MODIFIED = 0x20 # modified dynamically (by redirect)
+RTF_DONE = 0x40 # message confirmed
+RTF_DELCLONE = 0x80 # delete cloned route
+RTF_CLONING = 0x100 # generate new routes on use
+RTF_XRESOLVE = 0x200 # external daemon resolves name
+RTF_LLINFO = 0x400 # generated by link layer (e.g. ARP)
+RTF_STATIC = 0x800 # manually added
+RTF_BLACKHOLE = 0x1000 # just discard pkts (during updates)
+RTF_PROTO2 = 0x4000 # protocol specific routing flag
+RTF_PROTO1 = 0x8000 # protocol specific routing flag
+
+RTF_PRCLONING = 0x10000 # protocol requires cloning
+RTF_WASCLONED = 0x20000 # route generated through cloning
+RTF_PROTO3 = 0x40000 # protocol specific routing flag
+RTF_LOCAL = 0x200000 # route represents a local address
+RTF_BROADCAST = 0x400000 # route represents a bcast address
+RTF_MULTICAST = 0x800000 # route represents a mcast address
+RTF_IFSCOPE = 0x1000000 # has valid interface scope
+RTF_CONDEMNED = 0x2000000 # defunct; no longer modifiable
+
+RTA_DST = 0x1 # destination sockaddr present
+RTA_GATEWAY = 0x2 # gateway sockaddr present
+RTA_NETMASK = 0x4 # netmask sockaddr present
+RTA_GENMASK = 0x8 # cloning mask sockaddr present
+RTA_IFP = 0x10 # interface name sockaddr present
+RTA_IFA = 0x20 # interface addr sockaddr present
+RTA_AUTHOR = 0x40 # sockaddr for author of redirect
+RTA_BRD = 0x80 # for NEWADDR, broadcast or p-p dest addr
+
+RTM_VERSION = 5
+
+PF_ROUTE = 17
+
+STRUCT_RTMSG = struct.Struct('HBBHiiHiiiI3Ii10I')
+
+def _sendRouteMsg(type, index = 0, flags = 0, addrs = {}):
+ def add_addr((addr_flags, payload), (addr, flag)):
+ if not addr:
+ return (addr_flags, payload)
+
+ return (addr_flags | flag, payload + addr.encode())
+
+ (addr_flags, payload) = reduce(add_addr,
+ [ (addrs['dst'], RTA_DST),
+ (addrs['gateway'], RTA_GATEWAY),
+ (addrs['netmask'], RTA_NETMASK) ],
+ (0, ''))
+ msglen = STRUCT_RTMSG.size + len(payload)
+ data = STRUCT_RTMSG.pack(msglen, RTM_VERSION, type, index, flags, addr_flags, *((0,) * 19))
+
+ sock = socket.socket(PF_ROUTE, socket.SOCK_RAW)
+ try:
+ sock.send(data + payload)
+ finally:
+ sock.close()
+
+def addNet(dst = None, gateway = None, netmask = None, interface = None):
+ flags = RTF_STATIC | RTF_UP
+ if gateway:
+ flags |= RTF_GATEWAY
+ elif interface:
+ gateway = interface
+ _sendRouteMsg(type = RTM_ADD, flags = flags,
+ addrs = dict(dst = dst, gateway = gateway, netmask = netmask))
diff --git a/test/tuntap/sockaddr.py b/test/tuntap/sockaddr.py
new file mode 100644
index 0000000..59edbfc
--- /dev/null
+++ b/test/tuntap/sockaddr.py
@@ -0,0 +1,124 @@
+# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
+#
+# Redistribution and use in source and binary forms, with or without modification, are permitted
+# provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice, this list of
+# conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
+# conditions and the following disclaimer in the documentation and/or other materials provided
+# with the distribution.
+# 3. The name of the author may not be used to endorse or promote products derived from this
+# software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
+# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
+# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import socket
+import struct
+
+class SockaddrDl(object):
+
+ AF_LINK = 18
+ STRUCT = struct.Struct('BBH4B')
+
+ def __init__(self, name, addr, type, index = 0, af = AF_LINK):
+ self.af = af
+ self.index = index
+ self.type = type
+ self.name = name
+ self.addr = addr
+
+ def __repr__(self):
+ return 'SockaddrDl<%d, %d, %d, %s, %s>' % (self.af, self.index, self.type,
+ self.name, repr(self.addr))
+
+ def __eq__(self, other):
+ return (self.af == other.af and self.index == other.index and self.type == other.type and
+ self.name == other.name and self.addr == other.addr)
+
+ def encode(self):
+ # It's important to make this size 12 at least to meet sizeof(struct sockaddr_dl), routing
+ # setup chokes if it's not.
+ datalen = max(len(self.name) + len(self.addr), 12)
+ namelen = datalen - len(self.addr)
+ data = SockaddrDl.STRUCT.pack(SockaddrDl.STRUCT.size + datalen,
+ self.af, self.index, self.type,
+ namelen, len(self.addr), 0)
+ return data + self.name + '\x00' * (namelen - len(self.name)) + self.addr
+
+ @classmethod
+ def decode(self, data):
+ fields = SockaddrDl.STRUCT.unpack_from(data)
+ pname = SockaddrDl.STRUCT.size
+ paddr = pname + fields[4]
+ pend = paddr + fields[5]
+ return SockaddrDl(af = fields[1], index = fields[2], type = fields[3],
+ name = data[pname:paddr], addr = data[paddr:pend])
+
+
+class SockaddrIn(object):
+ """
+ Python wrapper for struct sockaddr_in.
+ """
+
+ STRUCT = struct.Struct('BBH4s8x')
+
+ def __init__(self, addr, port = 0, af = socket.AF_INET):
+ self.addr = addr or '0.0.0.0'
+ self.port = port
+ self.af = af
+
+ def __repr__(self):
+ return 'SockaddrIn<%d, %d, %s>' % (self.af, self.port, self.addr)
+
+ def __eq__(self, other):
+ return self.encode() == other.encode()
+
+ def encode(self):
+ return SockaddrIn.STRUCT.pack(16, self.af, self.port, socket.inet_aton(self.addr))
+
+ @classmethod
+ def decode(cls, data):
+ t = SockaddrIn.STRUCT.unpack(data)
+ return SockaddrIn(addr = socket.inet_ntoa(t[3]), port = t[2], af = t[1])
+
+
+class SockaddrIn6(object):
+ """
+ Python wrapper for struct sockaddr_in6.
+ """
+
+ STRUCT = struct.Struct('BBHI16sI')
+
+ def __init__(self, addr, port = 0, af = socket.AF_INET6, flowinfo = 0, scopeid = 0):
+ self.addr = addr or '::0'
+ self.port = port
+ self.af = af
+ self.flowinfo = flowinfo
+ self.scopeid = scopeid
+
+ def __repr__(self):
+ return 'SockaddrIn6<%d, %d, %s, %d, %d>' % (self.af, self.port, self.addr,
+ self.flowinfo, self.scopeid)
+
+ def __eq__(self, other):
+ return self.encode() == other.encode()
+
+ def encode(self):
+ return SockaddrIn6.STRUCT.pack(28, self.af, self.port, self.flowinfo,
+ socket.inet_pton(socket.AF_INET6, self.addr), self.scopeid)
+
+ @classmethod
+ def decode(cls, data):
+ t = SockaddrIn6.STRUCT.unpack(data)
+ return SockaddrIn6(addr = socket.inet_ntop(socket.AF_INET6, t[4]), port = t[2], af = t[1],
+ flowinfo = t[3], scopeid = t[5])
+
+
diff --git a/test/tuntap/test_char_dev.py b/test/tuntap/test_char_dev.py
new file mode 100644
index 0000000..ae34bf7
--- /dev/null
+++ b/test/tuntap/test_char_dev.py
@@ -0,0 +1,86 @@
+# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
+#
+# Redistribution and use in source and binary forms, with or without modification, are permitted
+# provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice, this list of
+# conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
+# conditions and the following disclaimer in the documentation and/or other materials provided
+# with the distribution.
+# 3. The name of the author may not be used to endorse or promote products derived from this
+# software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
+# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
+# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import errno
+import os
+from tuntap.char_dev_harness import TunCharDevHarness, TapCharDevHarness
+from unittest import TestCase
+
+class TestCharDev(TestCase):
+
+ def __init__(self, name, newHarness):
+ super(TestCharDev, self).__init__(name)
+ self._newHarness = newHarness
+
+ def setUp(self):
+ self.char_dev = self._newHarness()
+ self.char_dev.open()
+
+ def tearDown(self):
+ self.char_dev.close()
+
+ def test_Open(self):
+ pass
+
+ def test_OpenTwiceBusy(self):
+ second = self._newHarness(self.char_dev.unit)
+ try:
+ second.open()
+ second.close()
+ self.fail()
+ except OSError as e:
+ self.assertEqual(errno.EBUSY, e.errno)
+
+ def test_ReadFails(self):
+ try:
+ os.read(self.char_dev.fileno(), 1)
+ self.fail()
+ except OSError as e:
+ self.assertEqual(errno.EIO, e.errno)
+
+ def test_WriteFails(self):
+ try:
+ os.write(self.char_dev.fileno(), '')
+ self.fail()
+ except OSError as e:
+ self.assertEqual(errno.EIO, e.errno)
+
+
+class TestTunCharDev(TestCharDev):
+
+ def __init__(self, name):
+ super(TestTunCharDev, self).__init__(name, TunCharDevHarness)
+
+ def test_AFPrepend(self):
+ self.assertFalse(self.char_dev.prependAF)
+
+ self.char_dev.prependAF = 1
+ self.assertTrue(self.char_dev.prependAF)
+
+ self.char_dev.prependAF = 0
+ self.assertFalse(self.char_dev.prependAF)
+
+
+class TestTapCharDev(TestCharDev):
+
+ def __init__(self, name):
+ super(TestTapCharDev, self).__init__(name, TapCharDevHarness)
diff --git a/test/tuntap/test_interface.py b/test/tuntap/test_interface.py
new file mode 100644
index 0000000..7cf19b2
--- /dev/null
+++ b/test/tuntap/test_interface.py
@@ -0,0 +1,120 @@
+# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
+#
+# Redistribution and use in source and binary forms, with or without modification, are permitted
+# provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice, this list of
+# conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
+# conditions and the following disclaimer in the documentation and/or other materials provided
+# with the distribution.
+# 3. The name of the author may not be used to endorse or promote products derived from this
+# software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
+# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
+# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import errno
+import socket
+import unittest
+
+from tuntap.char_dev_harness import TunCharDevHarness, TapCharDevHarness
+from tuntap.interface_harness import Address, InterfaceHarness
+from tuntap.sockaddr import SockaddrDl, SockaddrIn, SockaddrIn6
+from tuntap.tun_tap_harness import TunHarness, TapHarness
+
+class TestInterface(unittest.TestCase):
+
+ def __init__(self, name, harness):
+ super(TestInterface, self).__init__(name)
+ self.harness = harness
+
+ def setUp(self):
+ self.harness.start()
+
+ def tearDown(self):
+ self.harness.stop()
+
+ def test_CloseWhileUp(self):
+ self.harness.interface.flags |= InterfaceHarness.IFF_UP
+ self.harness.char_dev.close()
+ self.harness.start()
+
+ def test_UpDown(self):
+ self.harness.interface.flags |= InterfaceHarness.IFF_UP
+ self.assertEquals(InterfaceHarness.IFF_UP,
+ self.harness.interface.flags & InterfaceHarness.IFF_UP)
+ self.harness.interface.flags &= ~InterfaceHarness.IFF_UP
+ self.assertEquals(0,
+ self.harness.interface.flags & InterfaceHarness.IFF_UP)
+
+ def test_NetmaskAFFix(self):
+ self.harness.interface.addIfAddr(local = self.harness.addr.sa_local,
+ dst = self.harness.addr.sa_dst,
+ mask = SockaddrIn(af = 0, addr = self.harness.addr.mask))
+ for addr in self.harness.interface.getAddrs(socket.AF_INET):
+ if addr[1] == self.harness.addr.sa_mask:
+ return;
+ self.fail()
+
+ def test_Address(self):
+ self.harness.interface.addIfAddr(local = self.harness.addr.sa_local,
+ dst = self.harness.addr.sa_dst,
+ mask = self.harness.addr.sa_mask)
+ for addr in self.harness.interface.getAddrs(socket.AF_INET):
+ if (addr[0] == self.harness.addr.sa_local and
+ addr[1] == self.harness.addr.sa_mask and
+ addr[2] == self.harness.addr.sa_dst):
+ return
+ self.fail()
+
+ def test_Address6(self):
+ def compare(expected, actual):
+ return (expected or SockaddrIn6(af = 0, addr = None)) == actual
+
+ self.harness.interface.addIfAddr6(local = self.harness.addr6.sa_local,
+ dst = self.harness.addr6.sa_dst,
+ mask = self.harness.addr6.sa_mask)
+ for addr in self.harness.interface.getAddrs(socket.AF_INET6):
+ if (compare(addr[0], self.harness.addr6.sa_local) and
+ compare(addr[1], self.harness.addr6.sa_mask) and
+ compare(addr[2], self.harness.addr6.sa_dst)):
+ return
+ self.fail()
+
+
+class TestTunInterface(TestInterface):
+
+ def __init__(self, name):
+ super(TestTunInterface, self).__init__(name, TunHarness())
+
+ def test_Flags(self):
+ self.assertEquals(InterfaceHarness.IFF_POINTOPOINT |
+ InterfaceHarness.IFF_RUNNING |
+ InterfaceHarness.IFF_SIMPLEX |
+ InterfaceHarness.IFF_MULTICAST,
+ self.harness.interface.flags)
+
+
+class TestTapInterface(TestInterface):
+
+ def __init__(self, name):
+ super(TestTapInterface, self).__init__(name, TapHarness())
+
+ def test_Flags(self):
+ self.assertEquals(InterfaceHarness.IFF_BROADCAST |
+ InterfaceHarness.IFF_RUNNING |
+ InterfaceHarness.IFF_SIMPLEX |
+ InterfaceHarness.IFF_MULTICAST,
+ self.harness.interface.flags)
+
+ def test_SetLladdr(self):
+ addr = SockaddrDl(name = '', addr = '\x11\x22\x33\x44\x55\x66', type = 0)
+ self.harness.interface.lladdr = addr
+ self.assertEquals(addr.addr, self.harness.interface.lladdr.addr)
diff --git a/test/tuntap/test_ip.py b/test/tuntap/test_ip.py
new file mode 100644
index 0000000..7043860
--- /dev/null
+++ b/test/tuntap/test_ip.py
@@ -0,0 +1,234 @@
+# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
+#
+# Redistribution and use in source and binary forms, with or without modification, are permitted
+# provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice, this list of
+# conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
+# conditions and the following disclaimer in the documentation and/or other materials provided
+# with the distribution.
+# 3. The name of the author may not be used to endorse or promote products derived from this
+# software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
+# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
+# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import functools
+import socket
+import struct
+from unittest import TestCase
+
+from tuntap.packet import IPv4Packet, IPv6Packet, UDPPacket
+from tuntap.packet_codec import TapPacketCodec
+from tuntap.packet_reader import SelectPacketSource
+
+class TestIO(TestCase):
+
+ def __init__(self, name, af, listenAddress, codec):
+ super(TestIO, self).__init__(name)
+ self._codec = codec(af, listenAddress);
+
+ def __str__(self):
+ return '%s [%s]' % (super(TestIO, self).__str__(), str(self._codec))
+
+ def setUp(self):
+ super(TestIO, self).setUp()
+ self._codec.start()
+
+ def tearDown(self):
+ self._codec.stop()
+ super(TestIO, self).tearDown()
+
+
+class TestIp(TestIO):
+
+ def __init__(self, name, codec):
+ super(TestIp, self).__init__(name, socket.AF_INET, None, codec)
+
+ def test_Send(self):
+ payload = 'knock, knock!'
+ port = 12345
+ self._codec.sendUDP(payload, (self._codec.addr.remote, port))
+ self._codec.expectPacket(
+ { 'version': 4,
+ 'src': socket.inet_pton(self._codec.af, self._codec.addr.local),
+ 'dst': socket.inet_pton(self._codec.af, self._codec.addr.remote),
+ 'proto': IPv4Packet.PROTO_UDP,
+ 'payload': { 'dst': port,
+ 'payload': payload } })
+ self.assertTrue(self._codec.runPacket())
+
+ def test_Recv(self):
+ srcport = 23456
+ payload = 'who\'s there?'
+ packet = IPv4Packet(proto = IPv4Packet.PROTO_UDP,
+ src = socket.inet_pton(self._codec.af, self._codec.addr.remote),
+ dst = socket.inet_pton(self._codec.af, self._codec.addr.local),
+ payload = UDPPacket(src = srcport,
+ dst = self._codec.UDPPort,
+ payload = payload))
+ self._codec.sendPacket(packet.encode())
+ self._codec.expectUDP(payload)
+ self.assertTrue(self._codec.runUDP())
+
+ def test_RecvMTUSize(self):
+ # Send a payload that's just within the MTU limit.
+ payload = '\xff' * (self._codec._harness.interface.mtu -
+ IPv4Packet().headerLen - UDPPacket().headerLen)
+ srcport = 23456
+ packet = IPv4Packet(proto = IPv4Packet.PROTO_UDP,
+ src = socket.inet_pton(self._codec.af, self._codec.addr.remote),
+ dst = socket.inet_pton(self._codec.af, self._codec.addr.local),
+ payload = UDPPacket(src = srcport,
+ dst = self._codec.UDPPort,
+ payload = payload))
+ assert len(packet.encode()) == self._codec._harness.interface.mtu
+ self._codec.sendPacket(packet.encode())
+ self._codec.expectUDP(payload)
+ self.assertTrue(self._codec.runUDP())
+
+
+class TestIp6(TestIO):
+
+ def __init__(self, name, codec):
+ super(TestIp6, self).__init__(name, socket.AF_INET6, None, codec)
+
+ def test_Send(self):
+ payload = 'knock, knock!'
+ port = 12345
+ self._codec.sendUDP(payload, (self._codec.addr.remote, port))
+ self._codec.expectPacket(
+ { 'version': 6,
+ 'src': socket.inet_pton(self._codec.af, self._codec.addr.local),
+ 'dst': socket.inet_pton(self._codec.af, self._codec.addr.remote),
+ 'proto': IPv6Packet.PROTO_UDP,
+ 'payload': { 'dst': port,
+ 'payload': payload } })
+ self.assertTrue(self._codec.runPacket())
+
+ def test_Recv(self):
+ srcport = 23456
+ payload = 'who\'s there?'
+ packet = IPv6Packet(proto = IPv6Packet.PROTO_UDP,
+ src = socket.inet_pton(self._codec.af, self._codec.addr.remote),
+ dst = socket.inet_pton(self._codec.af, self._codec.addr.local),
+ payload = UDPPacket(src = srcport,
+ dst = self._codec.UDPPort,
+ payload = payload))
+ self._codec.sendPacket(packet.encode())
+ self._codec.expectUDP(payload)
+ self.assertTrue(self._codec.runUDP())
+
+
+class TestMulticast(TestIO):
+
+ MULTICAST_GROUP = '224.1.2.3'
+
+ def __init__(self, name, codec):
+ super(TestMulticast, self).__init__(name, socket.AF_INET, TestMulticast.MULTICAST_GROUP,
+ codec)
+
+ def setUp(self):
+ super(TestMulticast, self).setUp()
+ mreq = struct.pack('4s4s',
+ socket.inet_pton(self._codec.af, TestMulticast.MULTICAST_GROUP),
+ socket.inet_pton(self._codec.af, self._codec.addr.local))
+ self._codec._recvSock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
+ self._codec._sendSock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
+ self._codec._sendSock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF,
+ socket.inet_pton(self._codec.af, self._codec.addr.local))
+
+ def test_Send(self):
+ payload = 'knock, knock!'
+ port = 12345
+ self._codec.sendUDP(payload, (TestMulticast.MULTICAST_GROUP, port))
+ self._codec.expectPacket(
+ { 'version': 4,
+ 'src': socket.inet_pton(self._codec.af, self._codec.addr.local),
+ 'dst': socket.inet_pton(self._codec.af, TestMulticast.MULTICAST_GROUP),
+ 'proto': IPv4Packet.PROTO_UDP,
+ 'payload': { 'dst': port,
+ 'payload': payload } })
+ self.assertTrue(self._codec.runPacket())
+
+ def test_Recv(self):
+ srcport = 23456
+ payload = 'who\'s there?'
+ packet = IPv4Packet(proto = IPv4Packet.PROTO_UDP,
+ src = socket.inet_pton(self._codec.af, self._codec.addr.remote),
+ dst = socket.inet_pton(self._codec.af, TestMulticast.MULTICAST_GROUP),
+ payload = UDPPacket(src = srcport,
+ dst = self._codec.UDPPort,
+ payload = payload))
+ self._codec.sendPacket(packet.encode())
+ self._codec.expectUDP(payload)
+ self.assertTrue(self._codec.runUDP())
+
+
+class TestMulticast6(TestIO):
+
+ MULTICAST_GROUP = 'ff05::114'
+
+ def __init__(self, name, codec):
+ super(TestMulticast6, self).__init__(name, socket.AF_INET6, TestMulticast6.MULTICAST_GROUP,
+ codec)
+
+ def setUp(self):
+ super(TestMulticast6, self).setUp()
+ mreq = struct.pack('16sI',
+ socket.inet_pton(self._codec.af, TestMulticast6.MULTICAST_GROUP),
+ self._codec._harness.interface.index)
+ self._codec._recvSock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
+ self._codec._sendSock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, 1)
+ self._codec._sendSock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF,
+ self._codec._harness.interface.index)
+
+ def test_Send(self):
+ payload = 'knock, knock!'
+ port = 12345
+ self._codec.sendUDP(payload, (TestMulticast6.MULTICAST_GROUP, port))
+ self._codec.expectPacket(
+ { 'version': 6,
+ 'dst': socket.inet_pton(self._codec.af, TestMulticast6.MULTICAST_GROUP),
+ 'proto': IPv6Packet.PROTO_UDP,
+ 'payload': { 'dst': port,
+ 'payload': payload } })
+ self.assertTrue(self._codec.runPacket())
+
+ def test_Recv(self):
+ srcport = 23456
+ payload = 'who\'s there?'
+ packet = IPv6Packet(proto = IPv6Packet.PROTO_UDP,
+ src = socket.inet_pton(self._codec.af, self._codec.addr.remote),
+ dst = socket.inet_pton(self._codec.af, TestMulticast6.MULTICAST_GROUP),
+ payload = UDPPacket(src = srcport,
+ dst = self._codec.UDPPort,
+ payload = payload))
+ self._codec.sendPacket(packet.encode())
+ self._codec.expectUDP(payload)
+ self.assertTrue(self._codec.runUDP())
+
+
+class TestTapLladdr(TestIp):
+
+ def __init__(self, name):
+ super(TestTapLladdr, self).__init__(name,
+ lambda af, addr: TapPacketCodec(af, addr,
+ SelectPacketSource))
+
+ def setUp(self):
+ super(TestTapLladdr, self).setUp()
+
+ # Swap out the link-level address with a different address.
+ lladdr = self._codec._harness.interface.lladdr
+ mac_addr = list(lladdr.addr)
+ mac_addr[5] = chr(ord(mac_addr[5]) ^ 0xff)
+ lladdr.addr = ''.join(mac_addr)
+ self._codec._harness.interface.lladdr = lladdr
diff --git a/test/tuntap/tun_tap_harness.py b/test/tuntap/tun_tap_harness.py
new file mode 100644
index 0000000..cb07638
--- /dev/null
+++ b/test/tuntap/tun_tap_harness.py
@@ -0,0 +1,96 @@
+# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
+#
+# Redistribution and use in source and binary forms, with or without modification, are permitted
+# provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice, this list of
+# conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
+# conditions and the following disclaimer in the documentation and/or other materials provided
+# with the distribution.
+# 3. The name of the author may not be used to endorse or promote products derived from this
+# software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
+# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
+# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import errno
+import socket
+
+from tuntap.char_dev_harness import TunCharDevHarness, TapCharDevHarness
+from tuntap.interface_harness import Address, InterfaceHarness
+import tuntap.route
+
+class TunTapHarness(object):
+
+ def __init__(self, name, newCharDevHarness, addr, addr6):
+ self._newCharDevHarness = newCharDevHarness
+ self.name = name
+ self.addr = addr
+ self.addr6 = addr6
+
+ def start(self):
+ self.char_dev = self._newCharDevHarness()
+ self.char_dev.open()
+ self.interface = InterfaceHarness(self.name, self.char_dev.unit)
+
+ def up(self):
+ self.interface.addIfAddr(local = self.addr.sa_local,
+ dst = self.addr.sa_dst,
+ mask = self.addr.sa_mask)
+ self.interface.addIfAddr6(local = self.addr6.sa_local,
+ dst = self.addr6.sa_dst,
+ mask = self.addr6.sa_mask)
+
+ # Lion automatically creates routes for IPv6 addresses, earlier versions don't.
+ try:
+ tuntap.route.addNet(dst = self.addr6.sa_remote,
+ netmask = self.addr6.sa_mask,
+ interface = self.interface.lladdr)
+ except IOError as e:
+ if e.errno != errno.EEXIST:
+ raise e
+
+ self.interface.flags |= InterfaceHarness.IFF_UP
+
+ def stop(self):
+ self.interface.flags &= ~InterfaceHarness.IFF_UP
+ self.char_dev.close()
+
+
+class TunHarness(TunTapHarness):
+
+ def __init__(self,
+ addr = Address(af = socket.AF_INET,
+ local = '10.0.0.1',
+ remote = '10.0.0.2',
+ dst = '10.0.0.2',
+ mask = '255.255.255.255'),
+ addr6 = Address(af = socket.AF_INET6,
+ local = 'fd00::1',
+ remote = 'fd00::2',
+ dst = 'fd00::2',
+ mask = 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff')):
+ super(TunHarness, self).__init__('tun', TunCharDevHarness, addr, addr6)
+
+
+class TapHarness(TunTapHarness):
+
+ def __init__(self,
+ addr = Address(af = socket.AF_INET,
+ local = '10.0.0.1',
+ remote = '10.0.0.2',
+ dst = '10.255.255.255',
+ mask = '255.0.0.0'),
+ addr6 = Address(af = socket.AF_INET6,
+ local = 'fd00::1',
+ remote = 'fd00::2',
+ dst = None,
+ mask = 'ffff:ffff:ffff:ffff::0')):
+ super(TapHarness, self).__init__('tap', TapCharDevHarness, addr, addr6)
diff --git a/test/tuntap/tun_tap_test_case.py b/test/tuntap/tun_tap_test_case.py
new file mode 100644
index 0000000..28edc46
--- /dev/null
+++ b/test/tuntap/tun_tap_test_case.py
@@ -0,0 +1,40 @@
+# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
+#
+# Redistribution and use in source and binary forms, with or without modification, are permitted
+# provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice, this list of
+# conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
+# conditions and the following disclaimer in the documentation and/or other materials provided
+# with the distribution.
+# 3. The name of the author may not be used to endorse or promote products derived from this
+# software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
+# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
+# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+from unittest import TestCase
+
+class TunTapTestCase(TestCase):
+
+ def __init__(self, name, harness):
+ super(TunTapTestCase, self).__init__(name)
+ self.harness = harness
+
+ def __str__(self):
+ return '%s [%s]' % (super(TunTapTestCase, self).__str__(),
+ self.harness.__class__.__name__)
+
+ def setUp(self):
+ self.harness.start()
+ self.harness.up()
+
+ def tearDown(self):
+ self.harness.stop()
diff --git a/test/tuntap/tuntap_tests.py b/test/tuntap/tuntap_tests.py
new file mode 100644
index 0000000..fb5a431
--- /dev/null
+++ b/test/tuntap/tuntap_tests.py
@@ -0,0 +1,83 @@
+# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
+#
+# Redistribution and use in source and binary forms, with or without modification, are permitted
+# provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice, this list of
+# conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
+# conditions and the following disclaimer in the documentation and/or other materials provided
+# with the distribution.
+# 3. The name of the author may not be used to endorse or promote products derived from this
+# software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
+# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
+# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import argparse
+import itertools
+import re
+import sys
+import unittest
+
+from tuntap.packet_codec import TunPacketCodec, TunAFPacketCodec, TapPacketCodec
+from tuntap.packet_reader import BlockingPacketSource, SelectPacketSource
+
+from tuntap.test_char_dev import TestTunCharDev, TestTapCharDev
+from tuntap.test_interface import TestTunInterface, TestTapInterface
+from tuntap.test_ip import TestIp, TestIp6, TestMulticast, TestMulticast6, TestTapLladdr
+
+class FilteringTestSuite(unittest.TestSuite):
+
+ def __init__(self, filter):
+ super(FilteringTestSuite, self).__init__()
+ self._matcher = re.compile(filter or '.*')
+
+ def __iter__(self):
+ return itertools.ifilter(lambda test : self._matcher.search(str(test)),
+ super(FilteringTestSuite, self).__iter__())
+
+def loadTestsFromTestCase(testCaseClass, *args, **kwargs):
+ testCaseNames = unittest.getTestCaseNames(testCaseClass, 'test_')
+ return unittest.TestSuite(map(lambda n : testCaseClass(n, *args, **kwargs), testCaseNames))
+
+def main(argv):
+ # Parse the command line.
+ parser = argparse.ArgumentParser(description = 'Run tuntap unit tests.')
+ parser.add_argument('--tests', type = str, nargs = '?', default = None,
+ help = 'tests to run')
+ parser.add_argument('--verbosity', type = int, nargs = '?', default = 2,
+ help = 'verbosity level')
+ options = parser.parse_args(argv[1:])
+
+ # Gather tests and run them.
+ loader = unittest.TestLoader()
+ suite = FilteringTestSuite(options.tests)
+ suite.addTests(loadTestsFromTestCase(TestTunCharDev))
+ suite.addTests(loadTestsFromTestCase(TestTapCharDev))
+ suite.addTests(loadTestsFromTestCase(TestTunInterface))
+ suite.addTests(loadTestsFromTestCase(TestTapInterface))
+
+ codecs = (TunPacketCodec, TunAFPacketCodec, TapPacketCodec)
+ sources = (SelectPacketSource, BlockingPacketSource)
+ tests = (TestIp, TestIp6, TestMulticast, TestMulticast6)
+ for (test, codec, source) in [ (test, codec, source) for test in tests
+ for codec in codecs
+ for source in sources ]:
+ suite.addTests(loadTestsFromTestCase(test, lambda af, addr: codec(af, addr, source)))
+
+ suite.addTests(loadTestsFromTestCase(TestTapLladdr))
+
+ runner = unittest.TextTestRunner(stream = sys.stderr,
+ descriptions = True,
+ verbosity = options.verbosity)
+ runner.run(suite)
+
+if __name__ == '__main__':
+ main(sys.argv)