#!/usr/bin/env python3 import argparse import sys import time import traceback from ipaddress import ip_address, ip_network import numpy as np import requests from urllib3.exceptions import InsecureRequestWarning import checker.nagios as nagios from checker.markdown import list_to_markdown_table from checker.units import filesize def is_internet_traffic(ip): private_networks = [ ip_network("10.0.0.0/8"), ip_network("172.16.0.0/12"), ip_network("192.168.0.0/16"), ] return not any(ip in network for network in private_networks) def get_traffic_top(args, interface): response = requests.get(f'https://{args.opnsense}/api/diagnostics/traffic/top/{interface}', headers={'Accept': 'application/json'}, auth=(args.key, args.secret), verify=False, timeout=10) if response.status_code != 200: print(f'UNKNOWN: unable to query OPNsense API for {interface}: {response.status_code}\n{response.text}') sys.exit(nagios.UNKNOWN) return response.json() def main(): parser = argparse.ArgumentParser(description='Check OPNsense network traffic for a host.') parser.add_argument('--opnsense', required=True, help='OPNsense hostname or IP address.') parser.add_argument('--key', required=True, help='OPNsense API key.') parser.add_argument('--secret', required=True, help='OPNsense API secret.') parser.add_argument('--interface', required=True, help='Interface to check (e.g., lan). Can be something like "lan,wan"') parser.add_argument('--host', required=True, help='Address of the host to check.') parser.add_argument('--duration', default=10, type=int, help='How many seconds to gather statistics.') parser.add_argument('--fail-empty', action='store_true', help='If the API did not return any data, fail with UNKNOWN. Otherwise, assume that there was no traffic.') parser.add_argument('--bandwidth', type=float, required=True, help='Bandwidth speed in Mbps. Used to calculate percentage.') parser.add_argument('--bandwidth-critical', type=int, default=75, help='Critical if percent of bandwidth usage is greater than or equal to this.') parser.add_argument('--bandwidth-warn', type=int, default=50, help='Warning if percent of bandwidth usage is greater than or equal to this.') parser.add_argument('--conn-critical', type=int, default=-1, help='Set critical level for number of connections. Default: -1 (disabled).') parser.add_argument('--conn-warn', type=int, default=-1, help='Set warning level for number of connections. Default: -1 (disabled).') args = parser.parse_args() check_result = {} interface_names = {} requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) # Map interface names to their internal names interfaces_mapping = requests.get(f'https://{args.opnsense}/api/diagnostics/traffic/interface', headers={'Accept': 'application/json'}, auth=(args.key, args.secret), verify=False, timeout=10) if interfaces_mapping.status_code != 200: print( f'UNKNOWN: unable to query OPNsense API for interface mappings: {interfaces_mapping.status_code}\n{interfaces_mapping.text}') sys.exit(nagios.UNKNOWN) interfaces_mapping = interfaces_mapping.json()['interfaces'] interfaces_to_check = set(args.interface.split(',')) for name, interface in interfaces_mapping.items(): if interface['name'] in interfaces_to_check: interfaces_to_check.remove(interface['name']) interface_names[interface['name']] = name if not len(interface_names.keys()): print(f'UNKNOWN: did not find any valid interface names! Double-check the name.') sys.exit(nagios.UNKNOWN) for name, interface in interface_names.items(): # Fetch the data traffic_data = [] for _ in range(args.duration): start_time = time.time() response = requests.get(f'https://{args.opnsense}/api/diagnostics/traffic/top/{interface}', headers={'Accept': 'application/json'}, auth=(args.key, args.secret), verify=False, timeout=10) end_time = time.time() api_request_time = end_time - start_time if response.status_code != 200: print(f'UNKNOWN: unable to query OPNsense API for {interface}: {response.status_code}\n{response.text}') sys.exit(nagios.UNKNOWN) for item in response.json().get(interface, {}).get('records', False): if item['address'] == args.host: traffic_data.append(item) adjusted_sleep_duration = max(1 - api_request_time, 0) time.sleep(adjusted_sleep_duration) if not len(traffic_data) and args.fail_empty: print('UNKNOWN: Interface or host not found in OPNsense API response. Raw response:') print(traffic_data) sys.exit(nagios.UNKNOWN) elif not len(traffic_data): # There was no traffic. check_result[name] = { 'rate_in': 0, 'rate_out': 0, 'cumulative_in': 0, 'cumulative_out': 0, 'connections': 0 } else: try: check_result[name] = { 'rate_in': np.average([x['rate_bits_in'] for x in traffic_data]), 'rate_out': np.average([x['rate_bits_out'] for x in traffic_data]), 'cumulative_in': np.average([x['cumulative_bytes_in'] for x in traffic_data]), 'cumulative_out': np.average([x['cumulative_bytes_out'] for x in traffic_data]), 'connections': int(np.average([len(x['details']) for x in traffic_data])) } except Exception as e: print(f'UNKNOWN: Failed to parse traffic data: "{e}"') print(traceback.format_exc()) print('') print('Raw data:') print(traffic_data) sys.exit(nagios.UNKNOWN) warn_b_value = (args.bandwidth * args.bandwidth_warn / 100) * 1e+6 crit_b_value = (args.bandwidth * args.bandwidth_critical / 100) * 1e+6 exit_code = nagios.OK critical = [] warn = [] ok = [] perf_data = [] output_table = [ ('Host', 'Interface', 'Rate In', 'Rate Out', 'Cumulative In', 'Cumulative Out', 'Connections', 'Status') ] def check_b(name, value): nonlocal exit_code if value >= crit_b_value: critical.append((name, filesize(value))) exit_code = nagios.CRITICAL return '[CRITICAL]', nagios.CRITICAL elif value >= warn_b_value: warn.append((name, filesize(value))) exit_code = nagios.WARNING return '[WARNING]', nagios.WARNING else: ok.append((name, filesize(value))) return '[OK]', nagios.OK for name, data in check_result.items(): status = '[OK]' in_status, in_rc = check_b('rate_in', data['rate_in']) if in_rc >= exit_code: status = in_status out_status, out_rc = check_b('rate_out', data['rate_out']) if out_rc >= exit_code: status = out_status if data['connections'] >= args.conn_critical > 0: critical.append(('connections', data['connections'])) exit_code = nagios.CRITICAL status = '[CRITICAL]' elif data['connections'] >= args.conn_warn > 0: warn.append(('connections', data['connections'])) exit_code = nagios.WARNING status = '[WARNING]' else: ok.append(('connections', data['connections'])) perf_data.append(f'\'{name}_rate_in\'={int(data["rate_in"])}B;{warn_b_value};{crit_b_value};0;') perf_data.append(f'\'{name}_rate_out\'={int(data["rate_out"])}B;{warn_b_value};{crit_b_value};0;') perf_data.append(f'\'{name}_cumulative_in\'={int(data["cumulative_in"])}B;{warn_b_value};{crit_b_value};0;') perf_data.append(f'\'{name}_cumulative_out\'={int(data["cumulative_out"])}B;{warn_b_value};{crit_b_value};0;') perf_data.append(f'\'{name}_connections\'={int(data["connections"])}B;{warn_b_value};{crit_b_value};0;') output_table.append((args.host, name, filesize(data['rate_in']), filesize(data['rate_out']), filesize(data['cumulative_in']), filesize(data['cumulative_out']), data['connections'], status)) if len(critical): x = ['CRITICAL: '] for i in critical: x.append(f'{i[0]}: {i[1]}, ') print(''.join(x).strip(', ')) if len(warn): x = ['WARN: '] for i in warn: x.append(f'{i[0]}: {i[1]}') print(''.join(x).strip(', ')) if not len(warn) and not len(critical): print(f'OK: bandwidth is below {args.bandwidth} Mbps.') print(list_to_markdown_table(output_table, align='left', seperator='!', borders=False)) print(f'| {" ".join(perf_data)}') sys.exit(exit_code) if __name__ == "__main__": try: main() except Exception as e: print(f'UNKNOWN: exception "{e}"') print(traceback.format_exc()) sys.exit(nagios.UNKNOWN)