Page Menu
Home
FreeBSD
Search
Configure Global Search
Log In
Files
F108377380
D38122.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
32 KB
Referenced Files
None
Subscribers
None
D38122.diff
View Options
diff --git a/tests/sys/netpfil/common/pft_ping.py b/tests/sys/netpfil/common/pft_ping.py
--- a/tests/sys/netpfil/common/pft_ping.py
+++ b/tests/sys/netpfil/common/pft_ping.py
@@ -3,6 +3,7 @@
# SPDX-License-Identifier: BSD-2-Clause
#
# Copyright (c) 2017 Kristof Provost <kp@FreeBSD.org>
+# Copyright (c) 2023 Kajetan Staszkiewicz <vegeta@tuxpowered.net>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
@@ -29,333 +30,505 @@
import argparse
import logging
logging.getLogger("scapy").setLevel(logging.CRITICAL)
+import math
import scapy.all as sp
-import socket
import sys
-from sniffer import Sniffer
-PAYLOAD_MAGIC = bytes.fromhex('42c0ffee')
+from copy import copy
+from sniffer import Sniffer
-dup_found = 0
-
-def check_dup(args, packet):
- """
- Verify that this is an ICMP packet, and that we only see one
- """
- global dup_found
-
- icmp = packet.getlayer(sp.ICMP)
- if not icmp:
- return False
-
- raw = packet.getlayer(sp.Raw)
- if not raw:
- return False
- if raw.load != PAYLOAD_MAGIC:
- return False
-
- dup_found = dup_found + 1
- return False
-
-def check_ping_request(args, packet):
- if args.ip6:
- return check_ping6_request(args, packet)
- else:
- return check_ping4_request(args, packet)
-
-def check_ping4_request(args, packet):
- """
- Verify that the packet matches what we'd have sent
- """
- dst_ip = args.to[0]
-
- ip = packet.getlayer(sp.IP)
- if not ip:
- return False
- if ip.dst != dst_ip:
- return False
-
- icmp = packet.getlayer(sp.ICMP)
- if not icmp:
- return False
- if sp.icmptypes[icmp.type] != 'echo-request':
- return False
-
- raw = packet.getlayer(sp.Raw)
- if not raw:
- return False
- if raw.load != PAYLOAD_MAGIC:
- return False
-
- # Wait to check expectations until we've established this is the packet we
- # sent.
- if args.expect_tos:
- if ip.tos != int(args.expect_tos[0]):
- print("Unexpected ToS value %d, expected %d" \
- % (ip.tos, int(args.expect_tos[0])))
- return False
-
- return True
-
-def check_ping6_request(args, packet):
- """
- Verify that the packet matches what we'd have sent
- """
- dst_ip = args.to[0]
-
- ip = packet.getlayer(sp.IPv6)
- if not ip:
- return False
- if ip.dst != dst_ip:
- return False
-
- icmp = packet.getlayer(sp.ICMPv6EchoRequest)
- if not icmp:
- return False
- if icmp.data != PAYLOAD_MAGIC:
- return False
-
- # Wait to check expectations until we've established this is the packet we
- # sent.
- if args.expect_tc:
- if ip.tc != int(args.expect_tc[0]):
- print("Unexpected traffic class value %d, expected %d" \
- % (ip.tc, int(args.expect_tc[0])))
- return False
-
- return True
-
-def check_ping_reply(args, packet):
- if args.ip6:
- return check_ping6_reply(args, packet)
- else:
- return check_ping4_reply(args, packet)
-
-def check_ping4_reply(args, packet):
- """
- Check that this is a reply to the ping request we sent
- """
- dst_ip = args.to[0]
-
- ip = packet.getlayer(sp.IP)
- if not ip:
- return False
- if ip.src != dst_ip:
- return False
-
- icmp = packet.getlayer(sp.ICMP)
- if not icmp:
- return False
- if sp.icmptypes[icmp.type] != 'echo-reply':
- return False
-
- raw = packet.getlayer(sp.Raw)
- if not raw:
- return False
- if raw.load != PAYLOAD_MAGIC:
- return False
-
- if args.expect_tos:
- if ip.tos != int(args.expect_tos[0]):
- print("Unexpected ToS value %d, expected %d" \
- % (ip.tos, int(args.expect_tos[0])))
- return False
-
- return True
-
-def check_ping6_reply(args, packet):
- """
- Check that this is a reply to the ping request we sent
- """
- dst_ip = args.to[0]
-
- ip = packet.getlayer(sp.IPv6)
- if not ip:
- return False
- if ip.src != dst_ip:
- return False
-
- icmp = packet.getlayer(sp.ICMPv6EchoReply)
- if not icmp:
- print("No echo reply!")
- return False
-
- if icmp.data != PAYLOAD_MAGIC:
- print("data mismatch")
- return False
-
- if args.expect_tc:
- if ip.tc != int(args.expect_tc[0]):
- print("Unexpected traffic class value %d, expected %d" \
- % (ip.tc, int(args.expect_tc[0])))
- return False
-
- return True
-
-def ping(send_if, dst_ip, args):
- ether = sp.Ether()
- ip = sp.IP(dst=dst_ip)
- icmp = sp.ICMP(type='echo-request')
- raw = sp.raw(PAYLOAD_MAGIC)
-
- if args.send_tos:
- ip.tos = int(args.send_tos[0])
-
- if args.fromaddr:
- ip.src = args.fromaddr[0]
-
- req = ether / ip / icmp / raw
- sp.sendp(req, iface=send_if, verbose=False)
-
-def ping6(send_if, dst_ip, args):
- ether = sp.Ether()
- ip6 = sp.IPv6(dst=dst_ip)
- icmp = sp.ICMPv6EchoRequest(data=sp.raw(PAYLOAD_MAGIC))
-
- if args.send_tc:
- ip6.tc = int(args.send_tc[0])
-
- if args.fromaddr:
- ip6.src = args.fromaddr[0]
-
- req = ether / ip6 / icmp
- sp.sendp(req, iface=send_if, verbose=False)
-
-def check_tcpsyn(args, packet):
- dst_ip = args.to[0]
-
- ip = packet.getlayer(sp.IP)
- if not ip:
- return False
- if ip.dst != dst_ip:
- return False
-
- tcp = packet.getlayer(sp.TCP)
- if not tcp:
- return False
-
- # Verify IP checksum
- chksum = ip.chksum
- ip.chksum = None
- new_chksum = sp.IP(sp.raw(ip)).chksum
- if chksum != new_chksum:
- print("Expected IP checksum %x but found %x\n" % (new_cshkum, chksum))
- return False
-
- # Verify TCP checksum
- chksum = tcp.chksum
- packet_raw = sp.raw(packet)
- tcp.chksum = None
- newpacket = sp.Ether(sp.raw(packet[sp.Ether]))
- new_chksum = newpacket[sp.TCP].chksum
- if chksum != new_chksum:
- print("Expected TCP checksum %x but found %x\n" % (new_chksum, chksum))
- return False
-
- return True
+logging.basicConfig(format='%(message)s')
+LOGGER = logging.getLogger(__name__)
-def tcpsyn(send_if, dst_ip, args):
- opts=[('Timestamp', (1, 1)), ('MSS', 1280)]
-
- if args.tcpopt_unaligned:
- opts = [('NOP', 0 )] + opts
+PAYLOAD_MAGIC = bytes.fromhex('42c0ffee')
- ether = sp.Ether()
- ip = sp.IP(dst=dst_ip)
- tcp = sp.TCP(dport=666, flags='S', options=opts)
-
- req = ether / ip / tcp
- sp.sendp(req, iface=send_if, verbose=False)
+def build_payload(l):
+ pl = len(PAYLOAD_MAGIC)
+ ret = PAYLOAD_MAGIC * math.floor(l/pl)
+ ret += PAYLOAD_MAGIC[0:(l % pl)]
+ return ret
+
+
+def prepare_ipv6(dst_address, send_params):
+ src_address = send_params.get('src_address')
+ hlim = send_params.get('hlim')
+ tc = send_params.get('tc')
+ ip6 = sp.IPv6(dst=dst_address)
+ if src_address:
+ ip6.src = src_address
+ if hlim:
+ ip6.hlim = hlim
+ if tc:
+ ip6.tc = tc
+ return ip6
+
+
+def prepare_ipv4(dst_address, send_params):
+ src_address = send_params.get('src_address')
+ flags = send_params.get('flags')
+ tos = send_params.get('tc')
+ ttl = send_params.get('hlim')
+ ip = sp.IP(dst=dst_address)
+ if src_address:
+ ip.src = src_address
+ if flags:
+ ip.flags = flags
+ if tos:
+ ip.tos = tos
+ if ttl:
+ ip.ttl = ttl
+ return ip
+
+
+def send_icmp_ping(dst_address, sendif, send_params):
+ send_length = send_params['length']
+ ether = sp.Ether()
+ if ':' in dst_address:
+ ip6 = prepare_ipv6(dst_address, send_params)
+ icmp = sp.ICMPv6EchoRequest(data=sp.raw(build_payload(send_length)))
+ req = ether / ip6 / icmp
+ else:
+ ip = prepare_ipv4(dst_address, send_params)
+ icmp = sp.ICMP(type='echo-request')
+ raw = sp.raw(build_payload(send_length))
+ req = ether / ip / icmp / raw
+ sp.sendp(req, sendif, verbose=False)
+
+
+def send_tcp_syn(dst_address, sendif, send_params):
+ tcpopt_unaligned = send_params.get('tcpopt_unaligned')
+ seq = send_params.get('seq')
+ mss = send_params.get('mss')
+ ether = sp.Ether()
+ opts=[('Timestamp', (1, 1)), ('MSS', mss if mss else 1280)]
+ if tcpopt_unaligned:
+ opts = [('NOP', 0 )] + opts
+ if ':' in dst_address:
+ ip = prepare_ipv6(dst_address, send_params)
+ else:
+ ip = prepare_ipv4(dst_address, send_params)
+ tcp = sp.TCP(dport=666, flags='S', options=opts, seq=seq)
+ req = ether / ip / tcp
+ sp.sendp(req, iface=sendif, verbose=False)
+
+
+def send_ping(dst_address, sendif, ping_type, send_params):
+ if ping_type == 'icmp':
+ send_icmp_ping(dst_address, sendif, send_params)
+ elif ping_type == 'tcpsyn':
+ send_tcp_syn(dst_address, sendif, send_params)
+ else:
+ raise Exception('Unspported ping type')
+
+
+def check_ipv4(expect_params, packet):
+ src_address = expect_params.get('src_address')
+ dst_address = expect_params.get('dst_address')
+ flags = expect_params.get('flags')
+ tos = expect_params.get('tc')
+ ttl = expect_params.get('hlim')
+ ip = packet.getlayer(sp.IP)
+ if not ip:
+ LOGGER.debug('Packet is not IPv4!')
+ return False
+ if src_address and ip.src != src_address:
+ LOGGER.debug('Source IPv4 address does not match!')
+ return False
+ if dst_address and ip.dst != dst_address:
+ LOGGER.debug('Destination IPv4 address does not match!')
+ return False
+ chksum = ip.chksum
+ ip.chksum = None
+ new_chksum = sp.IP(sp.raw(ip)).chksum
+ if chksum != new_chksum:
+ LOGGER.debug(f'Expected IP checksum {new_chksum} but found {chksum}')
+ return False
+ if flags and ip.flags != flags:
+ LOGGER.debug(f'Wrong IP flags value {ip.flags}, expected {flags}')
+ return False
+ if tos and ip.tos != tos:
+ LOGGER.debug(f'Wrong ToS value {ip.tos}, expected {tos}')
+ return False
+ if ttl and ip.ttl != ttl:
+ LOGGER.debug(f'Wrong TTL value {ip.ttl}, expected {ttl}')
+ return False
+ return True
+
+
+def check_ipv6(expect_params, packet):
+ src_address = expect_params.get('src_address')
+ dst_address = expect_params.get('dst_address')
+ flags = expect_params.get('flags')
+ hlim = expect_params.get('hlim')
+ tc = expect_params.get('tc')
+ ip6 = packet.getlayer(sp.IPv6)
+ if not ip6:
+ LOGGER.debug('Packet is not IPv6!')
+ return False
+ if src_address and ip6.src != src_address:
+ LOGGER.debug('Source IPv6 address does not match!')
+ return False
+ if dst_address and ip6.dst != dst_address:
+ LOGGER.debug('Destination IPv6 address does not match!')
+ return False
+ # IPv6 has no IP-level checksum.
+ if flags:
+ raise Exception("There's no fragmentation flags in IPv6")
+ if hlim and ip6.hlim != hlim:
+ LOGGER.debug(f'Wrong Hop Limit value {ip6.hlim}, expected {hlim}')
+ return False
+ if tc and ip6.tc != tc:
+ LOGGER.debug(f'Wrong TC value {ip6.tc}, expected {tc}')
+ return False
+ return True
+
+
+def check_ping_4(expect_params, packet):
+ expect_length = expect_params['length']
+ if not check_ipv4(expect_params, packet):
+ return False
+ icmp = packet.getlayer(sp.ICMP)
+ if not icmp:
+ LOGGER.debug('Packet is not IPv4 ICMP!')
+ return False
+ raw = packet.getlayer(sp.Raw)
+ if not raw:
+ LOGGER.debug('Packet contains no payload!')
+ return False
+ if raw.load != build_payload(expect_length):
+ LOGGER.debug('Payload magic does not match!')
+ return False
+ return True
+
+
+def check_ping_request_4(expect_params, packet):
+ if not check_ping_4(expect_params, packet):
+ return False
+ icmp = packet.getlayer(sp.ICMP)
+ if sp.icmptypes[icmp.type] != 'echo-request':
+ LOGGER.debug('Packet is not IPv4 ICMP Echo Request!')
+ return False
+ return True
+
+
+def check_ping_reply_4(expect_params, packet):
+ if not check_ping_4(expect_params, packet):
+ return False
+ icmp = packet.getlayer(sp.ICMP)
+ if sp.icmptypes[icmp.type] != 'echo-reply':
+ LOGGER.debug('Packet is not IPv4 ICMP Echo Reply!')
+ return False
+ return True
+
+
+def check_ping_request_6(expect_params, packet):
+ expect_length = expect_params['length']
+ if not check_ipv6(expect_params, packet):
+ return False
+ icmp = packet.getlayer(sp.ICMPv6EchoRequest)
+ if not icmp:
+ LOGGER.debug('Packet is not IPv6 ICMP Echo Request!')
+ return False
+ if icmp.data != build_payload(expect_length):
+ LOGGER.debug('Payload magic does not match!')
+ return False
+ return True
+
+
+def check_ping_reply_6(expect_params, packet):
+ expect_length = expect_params['length']
+ if not check_ipv6(expect_params, packet):
+ return False
+ icmp = packet.getlayer(sp.ICMPv6EchoReply)
+ if not icmp:
+ LOGGER.debug('Packet is not IPv6 ICMP Echo Reply!')
+ return False
+ if icmp.data != build_payload(expect_length):
+ LOGGER.debug('Payload magic does not match!')
+ return False
+ return True
+
+
+def check_ping_request(expect_params, packet):
+ src_address = expect_params.get('src_address')
+ dst_address = expect_params.get('dst_address')
+ if not (src_address or dst_address):
+ raise Exception('Source or destination address must be given to match the ping request!')
+ if (
+ (src_address and ':' in src_address) or
+ (dst_address and ':' in dst_address)
+ ):
+ return check_ping_request_6(expect_params, packet)
+ else:
+ return check_ping_request_4(expect_params, packet)
+
+
+def check_ping_reply(expect_params, packet):
+ src_address = expect_params.get('src_address')
+ dst_address = expect_params.get('dst_address')
+ if not (src_address or dst_address):
+ raise Exception('Source or destination address must be given to match the ping reply!')
+ if (
+ (src_address and ':' in src_address) or
+ (dst_address and ':' in dst_address)
+ ):
+ return check_ping_reply_6(expect_params, packet)
+ else:
+ return check_ping_reply_4(expect_params, packet)
+
+
+def check_tcp(expect_params, packet):
+ tcp_flags = expect_params.get('tcp_flags')
+ mss = expect_params.get('mss')
+ seq = expect_params.get('seq')
+ tcp = packet.getlayer(sp.TCP)
+ if not tcp:
+ LOGGER.debug('Packet is not TCP!')
+ return False
+ chksum = tcp.chksum
+ tcp.chksum = None
+ newpacket = sp.Ether(sp.raw(packet[sp.Ether]))
+ new_chksum = newpacket[sp.TCP].chksum
+ if chksum != new_chksum:
+ LOGGER.debug(f'Wrong TCP checksum {chksum}, expected {new_chksum}!')
+ return False
+ if tcp_flags and tcp.flags != tcp_flags:
+ LOGGER.debug(f'Wrong TCP flags {tcp.flags}, expected {tcp_flags}!')
+ return False
+ if seq:
+ if tcp_flags == 'S':
+ tcp_seq = tcp.seq
+ elif tcp_flags == 'SA':
+ tcp_seq = tcp.ack - 1
+ if seq != tcp_seq:
+ LOGGER.debug(f'Wrong TCP Sequence Number {tcp_seq}, expected {seq}')
+ return False
+ if mss:
+ for option in tcp.options:
+ if option[0] == 'MSS':
+ if option[1] != mss:
+ LOGGER.debug(f'Wrong TCP MSS {option[1]}, expected {mss}')
+ return False
+ return True
+
+
+def check_tcp_syn_request_4(expect_params, packet):
+ if not check_ipv4(expect_params, packet):
+ return False
+ if not check_tcp(expect_params | {'tcp_flags': 'S'}, packet):
+ return False
+ return True
+
+
+def check_tcp_syn_reply_4(expect_params, packet):
+ if not check_ipv4(expect_params, packet):
+ return False
+ if not check_tcp(expect_params | {'tcp_flags': 'SA'}, packet):
+ return False
+ return True
+
+
+def check_tcp_syn_request_6(expect_params, packet):
+ if not check_ipv6(expect_params, packet):
+ return False
+ if not check_tcp(expect_params | {'tcp_flags': 'S'}, packet):
+ return False
+ return True
+
+
+def check_tcp_syn_reply_6(expect_params, packet):
+ if not check_ipv6(expect_params, packet):
+ return False
+ if not check_tcp(expect_params | {'tcp_flags': 'SA'}, packet):
+ return False
+ return True
+
+
+def check_tcp_syn_request(expect_params, packet):
+ src_address = expect_params.get('src_address')
+ dst_address = expect_params.get('dst_address')
+ if not (src_address or dst_address):
+ raise Exception('Source or destination address must be given to match the tcp syn request!')
+ if (
+ (src_address and ':' in src_address) or
+ (dst_address and ':' in dst_address)
+ ):
+ return check_tcp_syn_request_6(expect_params, packet)
+ else:
+ return check_tcp_syn_request_4(expect_params, packet)
+
+
+def check_tcp_syn_reply(expect_params, packet):
+ src_address = expect_params.get('src_address')
+ dst_address = expect_params.get('dst_address')
+ if not (src_address or dst_address):
+ raise Exception('Source or destination address must be given to match the tcp syn reply!')
+ if (
+ (src_address and ':' in src_address) or
+ (dst_address and ':' in dst_address)
+ ):
+ return check_tcp_syn_reply_6(expect_params, packet)
+ else:
+ return check_tcp_syn_reply_4(expect_params, packet)
+
+
+def setup_sniffer(recvif, ping_type, sniff_type, expect_params):
+ if ping_type == 'icmp' and sniff_type == 'request':
+ checkfn = check_ping_request
+ elif ping_type == 'icmp' and sniff_type == 'reply':
+ checkfn = check_ping_reply
+ elif ping_type == 'tcpsyn' and sniff_type == 'request':
+ checkfn = check_tcp_syn_request
+ elif ping_type == 'tcpsyn' and sniff_type == 'reply':
+ checkfn = check_tcp_syn_reply
+ else:
+ raise Exception('Unspported ping or sniff type')
+
+ return Sniffer(expect_params, checkfn, recvif)
+
+
+def parse_args():
+ parser = argparse.ArgumentParser("pft_ping.py",
+ description="Ping test tool")
+
+ # Parameters of sent ping request
+ parser.add_argument('--sendif', nargs=1,
+ required=True,
+ help='The interface through which the packet(s) will be sent')
+ parser.add_argument('--to', nargs=1,
+ required=True,
+ help='The destination IP address for the ping request')
+ parser.add_argument('--ping-type',
+ choices=('icmp', 'tcpsyn'),
+ help='Type of ping: ICMP (default) or TCP SYN',
+ default='icmp')
+ parser.add_argument('--fromaddr', nargs=1,
+ help='The source IP address for the ping request')
+
+ # Where to look for packets to analyze.
+ # The '+' format is ugly as it mixes positional with optional syntax.
+ # But we have no positional parameters so I guess it's fine to use it.
+ parser.add_argument('--recvif', nargs='+',
+ help='The interfaces on which to expect the ping request')
+ parser.add_argument('--replyif', nargs='+',
+ help='The interfaces which to expect the ping response')
+
+ # Packet settings
+ parser_send = parser.add_argument_group('Values set in transmitted packets')
+ parser_send.add_argument('--send-flags', nargs=1, type=str,
+ help='IPv4 fragmentation flags')
+ parser_send.add_argument('--send-hlim', nargs=1, type=int,
+ help='IPv6 Hop Limit or IPv4 Time To Live')
+ parser_send.add_argument('--send-mss', nargs=1, type=int,
+ help='TCP Maximum Segment Size')
+ parser_send.add_argument('--send-seq', nargs=1, type=int,
+ help='TCP sequence number')
+ parser_send.add_argument('--send-length', nargs=1, type=int,
+ default=[len(PAYLOAD_MAGIC)], help='ICMP Echo Request payload size')
+ parser_send.add_argument('--send-tc', nargs=1, type=int,
+ help='IPv6 Traffic Class or IPv4 DiffServ / ToS')
+ parser_send.add_argument('--send-tcpopt-unaligned', action='store_true',
+ help='Include unaligned TCP options')
+
+ # Expectations
+ parser_expect = parser.add_argument_group('Values expected in sniffed packets')
+ parser_expect.add_argument('--expect-flags', nargs=1, type=str,
+ help='IPv4 fragmentation flags')
+ parser_expect.add_argument('--expect-hlim', nargs=1, type=int,
+ help='IPv6 Hop Limit or IPv4 Time To Live')
+ parser_expect.add_argument('--expect-mss', nargs=1, type=int,
+ help='TCP Maximum Segment Size')
+ parser_send.add_argument('--expect-seq', nargs=1, type=int,
+ help='TCP sequence number')
+ parser_expect.add_argument('--expect-tc', nargs=1, type=int,
+ help='IPv6 Traffic Class or IPv4 DiffServ / ToS')
+
+ parser.add_argument('-v', '--verbose', action='store_true',
+ help=('Enable verbose logging. Apart of potentially useful information '
+ 'you might see warnings from parsing packets like NDP or other '
+ 'packets not related to the test being run. Use only when '
+ 'developing because real tests expect empty stderr and stdout.'))
+
+ return parser.parse_args()
def main():
- parser = argparse.ArgumentParser("pft_ping.py",
- description="Ping test tool")
- parser.add_argument('--sendif', nargs=1,
- required=True,
- help='The interface through which the packet(s) will be sent')
- parser.add_argument('--recvif', nargs=1,
- help='The interface on which to expect the ICMP echo request')
- parser.add_argument('--replyif', nargs=1,
- help='The interface on which to expect the ICMP echo response')
- parser.add_argument('--checkdup', nargs=1,
- help='The interface on which to expect the duplicated ICMP packets')
- parser.add_argument('--ip6', action='store_true',
- help='Use IPv6')
- parser.add_argument('--to', nargs=1,
- required=True,
- help='The destination IP address for the ICMP echo request')
- parser.add_argument('--fromaddr', nargs=1,
- help='The source IP address for the ICMP echo request')
-
- # TCP options
- parser.add_argument('--tcpsyn', action='store_true',
- help='Send a TCP SYN packet')
- parser.add_argument('--tcpopt_unaligned', action='store_true',
- help='Include unaligned TCP options')
-
- # Packet settings
- parser.add_argument('--send-tos', nargs=1,
- help='Set the ToS value for the transmitted packet')
- parser.add_argument('--send-tc', nargs=1,
- help='Set the traffic class value for the transmitted packet')
-
- # Expectations
- parser.add_argument('--expect-tos', nargs=1,
- help='The expected ToS value in the received packet')
- parser.add_argument('--expect-tc', nargs=1,
- help='The expected traffic class value in the received packet')
-
- args = parser.parse_args()
-
- # We may not have a default route. Tell scapy where to start looking for routes
- sp.conf.iface6 = args.sendif[0]
-
- sniffer = None
- if not args.recvif is None:
- checkfn=check_ping_request
- if args.tcpsyn:
- checkfn=check_tcpsyn
-
- sniffer = Sniffer(args, checkfn, args.recvif[0])
-
- replysniffer = None
- if not args.replyif is None:
- checkfn=check_ping_reply
- replysniffer = Sniffer(args, checkfn, args.replyif[0])
-
- dupsniffer = None
- if args.checkdup is not None:
- dupsniffer = Sniffer(args, check_dup, args.checkdup[0])
-
- if args.tcpsyn:
- tcpsyn(args.sendif[0], args.to[0], args)
- else:
- if args.ip6:
- ping6(args.sendif[0], args.to[0], args)
- else:
- ping(args.sendif[0], args.to[0], args)
-
- if dupsniffer:
- dupsniffer.join()
- if dup_found != 1:
- sys.exit(1)
-
- if sniffer:
- sniffer.join()
-
- if sniffer.correctPackets:
- sys.exit(0)
- else:
- sys.exit(1)
-
- if replysniffer:
- replysniffer.join()
-
- if replysniffer.correctPackets:
- sys.exit(0)
- else:
- sys.exit(1)
+ args = parse_args()
+
+ if args.verbose:
+ LOGGER.setLevel(logging.DEBUG)
+
+ # Dig out real values of program arguments
+ send_if = args.sendif[0]
+ reply_ifs = args.replyif
+ recv_ifs = args.recvif
+ dst_address = args.to[0]
+
+ # Standardize parameters which have nargs=1.
+ send_params = {}
+ expect_params = {}
+ for param_name in ('flags', 'hlim', 'length', 'mss', 'seq', 'tc'):
+ param_arg = vars(args).get(f'send_{param_name}')
+ send_params[param_name] = param_arg[0] if param_arg else None
+ param_arg = vars(args).get(f'expect_{param_name}')
+ expect_params[param_name] = param_arg[0] if param_arg else None
+
+ expect_params['length'] = send_params['length']
+ send_params['tcpopt_unaligned'] = args.send_tcpopt_unaligned
+ send_params['src_address'] = args.fromaddr[0] if args.fromaddr else None
+
+ # We may not have a default route. Tell scapy where to start looking for routes
+ sp.conf.iface6 = send_if
+
+ # Configuration sanity checking.
+ if not (reply_ifs or recv_ifs):
+ raise Exception('With no reply or recv interface specified no traffic '
+ 'can be sniffed and verified!'
+ )
+
+ sniffers = []
+
+ if recv_ifs:
+ sniffer_params = copy(expect_params)
+ sniffer_params['src_address'] = None
+ sniffer_params['dst_address'] = dst_address
+ for iface in recv_ifs:
+ LOGGER.debug(f'Installing receive sniffer on {iface}')
+ sniffers.append(
+ setup_sniffer(iface, args.ping_type, 'request', sniffer_params,
+ ))
+
+ if reply_ifs:
+ sniffer_params = copy(expect_params)
+ sniffer_params['src_address'] = dst_address
+ sniffer_params['dst_address'] = None
+ for iface in reply_ifs:
+ LOGGER.debug(f'Installing reply sniffer on {iface}')
+ sniffers.append(
+ setup_sniffer(iface, args.ping_type, 'reply', sniffer_params,
+ ))
+
+ LOGGER.debug(f'Installed {len(sniffers)} sniffers')
+
+ send_ping(dst_address, send_if, args.ping_type, send_params)
+
+ err = 0
+ sniffer_num = 0
+ for sniffer in sniffers:
+ sniffer.join()
+ if sniffer.correctPackets == 1:
+ LOGGER.debug(f'Expected ping has been sniffed on {sniffer._recvif}.')
+ else:
+ # Set a bit in err for each failed sniffer.
+ err |= 1<<sniffer_num
+ if sniffer.correctPackets > 1:
+ LOGGER.debug(f'Duplicated ping has been sniffed on {sniffer._recvif}!')
+ else:
+ LOGGER.debug(f'Expected ping has not been sniffed on {sniffer._recvif}!')
+ sniffer_num += 1
+
+ return err
+
if __name__ == '__main__':
- main()
+ sys.exit(main())
diff --git a/tests/sys/netpfil/common/tos.sh b/tests/sys/netpfil/common/tos.sh
--- a/tests/sys/netpfil/common/tos.sh
+++ b/tests/sys/netpfil/common/tos.sh
@@ -68,7 +68,7 @@
--sendif ${epair_send}a \
--to 198.51.100.3 \
--recvif ${epair_recv}a \
- --expect-tos 36
+ --expect-tc 36
# Check if the firewall is able to set the ToS bits
# and persists the EN bits (if already set)
@@ -82,8 +82,8 @@
--sendif ${epair_send}a \
--to 198.51.100.3 \
--recvif ${epair_recv}a \
- --send-tos 3 \
- --expect-tos 39
+ --send-tc 3 \
+ --expect-tc 39
# Check if the firewall is able to filter the
# packets based on the ToS value
@@ -97,13 +97,13 @@
--sendif ${epair_send}a \
--to 198.51.100.3 \
--recvif ${epair_recv}a \
- --send-tos 36
+ --send-tc 36
atf_check -s exit:0 $(atf_get_srcdir)/pft_ping.py \
--sendif ${epair_send}a \
--to 198.51.100.3 \
--recvif ${epair_recv}a \
- --send-tos 32
+ --send-tc 32
}
tos_cleanup()
diff --git a/tests/sys/netpfil/pf/checksum.sh b/tests/sys/netpfil/pf/checksum.sh
--- a/tests/sys/netpfil/pf/checksum.sh
+++ b/tests/sys/netpfil/pf/checksum.sh
@@ -64,15 +64,15 @@
--sendif ${epair_in}a \
--to 198.51.100.2 \
--recvif ${epair_out}b \
- --tcpsyn
+ --ping-type tcpsyn
# And unaligned
atf_check -s exit:0 ${common_dir}/pft_ping.py \
--sendif ${epair_in}a \
--to 198.51.100.2 \
--recvif ${epair_out}b \
- --tcpsyn \
- --tcpopt_unaligned
+ --ping-type tcpsyn \
+ --send-tcpopt-unaligned
}
unaligned_cleanup()
diff --git a/tests/sys/netpfil/pf/dup.sh b/tests/sys/netpfil/pf/dup.sh
--- a/tests/sys/netpfil/pf/dup.sh
+++ b/tests/sys/netpfil/pf/dup.sh
@@ -67,8 +67,7 @@
atf_check -s exit:0 ${common_dir}/pft_ping.py \
--sendif ${epair_send}a \
--to 198.51.100.3 \
- --recv ${epair_recv}a \
- --checkdup ${epair_dupto}a
+ --recv ${epair_recv}a ${epair_dupto}a
}
dup_to_cleanup()
diff --git a/tests/sys/netpfil/pf/forward.sh b/tests/sys/netpfil/pf/forward.sh
--- a/tests/sys/netpfil/pf/forward.sh
+++ b/tests/sys/netpfil/pf/forward.sh
@@ -122,7 +122,6 @@
# Sanity check, can we forward ICMP echo requests without pf?
atf_check -s exit:0 ${common_dir}/pft_ping.py \
- --ip6 \
--sendif ${epair_send}a \
--to 2001:db8:43::3 \
--recvif ${epair_recv}a
@@ -133,7 +132,6 @@
pft_set_rules alcatraz \
"block in inet6 proto icmp6 icmp6-type echoreq"
atf_check -s exit:1 ${common_dir}/pft_ping.py \
- --ip6 \
--sendif ${epair_send}a \
--to 2001:db8:43::3 \
--recvif ${epair_recv}a
@@ -142,7 +140,6 @@
pft_set_rules alcatraz \
"block out inet6 proto icmp6 icmp6-type echoreq"
atf_check -s exit:1 -e ignore ${common_dir}/pft_ping.py \
- --ip6 \
--sendif ${epair_send}a \
--to 2001:db8:43::3 \
--recvif ${epair_recv}a
@@ -152,7 +149,6 @@
"block out" \
"pass out inet6 proto icmp6"
atf_check -s exit:0 ${common_dir}/pft_ping.py \
- --ip6 \
--sendif ${epair_send}a \
--to 2001:db8:43::3 \
--recvif ${epair_recv}a
@@ -162,7 +158,6 @@
"block out inet6 proto icmp6 icmp6-type echoreq" \
"pass in proto icmp"
atf_check -s exit:1 ${common_dir}/pft_ping.py \
- --ip6 \
--sendif ${epair_send}a \
--to 2001:db8:43::3 \
--recvif ${epair_recv}a
diff --git a/tests/sys/netpfil/pf/killstate.sh b/tests/sys/netpfil/pf/killstate.sh
--- a/tests/sys/netpfil/pf/killstate.sh
+++ b/tests/sys/netpfil/pf/killstate.sh
@@ -130,7 +130,6 @@
# Sanity check & establish state
atf_check -s exit:0 -o ignore ${common_dir}/pft_ping.py \
- --ip6 \
--sendif ${epair}a \
--to 2001:db8::2 \
--replyif ${epair}a
diff --git a/tests/sys/netpfil/pf/set_tos.sh b/tests/sys/netpfil/pf/set_tos.sh
--- a/tests/sys/netpfil/pf/set_tos.sh
+++ b/tests/sys/netpfil/pf/set_tos.sh
@@ -66,7 +66,7 @@
--sendif ${epair_send}a \
--to 198.51.100.3 \
--recvif ${epair_recv}a \
- --expect-tos 42
+ --expect-tc 42
# The requested ToS is set
pft_set_rules alcatraz "scrub out proto icmp set-tos 42"
@@ -74,7 +74,7 @@
--sendif ${epair_send}a \
--to 198.51.100.3 \
--recvif ${epair_recv}a \
- --expect-tos 42
+ --expect-tc 42
# ToS is not changed if the scrub rule does not match
pft_set_rules alcatraz "scrub out proto tcp set-tos 42"
@@ -82,7 +82,7 @@
--sendif ${epair_send}a \
--to 198.51.100.3 \
--recvif ${epair_recv}a \
- --expect-tos 42
+ --expect-tc 42
# Multiple scrub rules match as expected
pft_set_rules alcatraz "scrub out proto tcp set-tos 13" \
@@ -91,15 +91,15 @@
--sendif ${epair_send}a \
--to 198.51.100.3 \
--recvif ${epair_recv}a \
- --expect-tos 14
+ --expect-tc 14
# And this works even if the packet already has ToS values set
atf_check -s exit:0 ${common_dir}/pft_ping.py \
--sendif ${epair_send}a \
--to 198.51.100.3 \
--recvif ${epair_recv}a \
- --send-tos 42 \
- --expect-tos 14
+ --send-tc 42 \
+ --expect-tc 14
# ToS values are unmolested if the packets do not match a scrub rule
pft_set_rules alcatraz "scrub out proto tcp set-tos 13"
@@ -107,8 +107,8 @@
--sendif ${epair_send}a \
--to 198.51.100.3 \
--recvif ${epair_recv}a \
- --send-tos 42 \
- --expect-tos 42
+ --send-tc 42 \
+ --expect-tc 42
}
v4_cleanup()
@@ -147,7 +147,6 @@
# No change is done if not requested
pft_set_rules alcatraz "scrub out proto ipv6-icmp"
atf_check -s exit:1 -o ignore -e ignore ${common_dir}/pft_ping.py \
- --ip6 \
--sendif ${epair}a \
--to 2001:db8:192::2 \
--replyif ${epair}a \
@@ -156,7 +155,6 @@
# The requested ToS is set
pft_set_rules alcatraz "scrub out proto ipv6-icmp set-tos 42"
atf_check -s exit:0 -o ignore -e ignore ${common_dir}/pft_ping.py \
- --ip6 \
--sendif ${epair}a \
--to 2001:db8:192::2 \
--replyif ${epair}a \
@@ -165,7 +163,6 @@
# ToS is not changed if the scrub rule does not match
pft_set_rules alcatraz "scrub out from 2001:db8:192::3 set-tos 42"
atf_check -s exit:1 -o ignore -e ignore ${common_dir}/pft_ping.py \
- --ip6 \
--sendif ${epair}a \
--to 2001:db8:192::2 \
--replyif ${epair}a \
@@ -175,7 +172,6 @@
pft_set_rules alcatraz "scrub out from 2001:db8:192::3 set-tos 13" \
"scrub out proto ipv6-icmp set-tos 14"
atf_check -s exit:0 -o ignore -e ignore ${common_dir}/pft_ping.py \
- --ip6 \
--sendif ${epair}a \
--to 2001:db8:192::2 \
--replyif ${epair}a \
@@ -183,7 +179,6 @@
# And this works even if the packet already has ToS values set
atf_check -s exit:0 -o ignore -e ignore ${common_dir}/pft_ping.py \
- --ip6 \
--sendif ${epair}a \
--to 2001:db8:192::2 \
--replyif ${epair}a \
@@ -193,7 +188,6 @@
# ToS values are unmolested if the packets do not match a scrub rule
pft_set_rules alcatraz "scrub out from 2001:db8:192::3 set-tos 13"
atf_check -s exit:0 -o ignore -e ignore ${common_dir}/pft_ping.py \
- --ip6 \
--sendif ${epair}a \
--to 2001:db8:192::2 \
--replyif ${epair}a \
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Jan 25, 7:21 AM (19 h, 52 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
16130549
Default Alt Text
D38122.diff (32 KB)
Attached To
Mode
D38122: netpfil tests: Improve pft_ping.py
Attached
Detach File
Event Timeline
Log In to Comment