icinga2-checks/check_opnsense_traffic_for_...

147 lines
6.3 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import sys
import time
import traceback
import numpy as np
import requests
from urllib3.exceptions import InsecureRequestWarning
import checker.nagios as nagios
from checker.markdown import list_to_markdown_table
from checker.units import filesize
def main():
parser = argparse.ArgumentParser(description='Check OPNsense network traffic for a host.')
parser.add_argument('--opnsense', required=True, help='OPNsense hostname or IP address.')
parser.add_argument('--key', required=True, help='OPNsense API key.')
parser.add_argument('--secret', required=True, help='OPNsense API secret.')
parser.add_argument('--interface', required=True,
help='Interface to check (e.g., lan). Can be something like "lan,wan"')
parser.add_argument('--host', required=True, help='Address of the host to check.')
parser.add_argument('--duration', default=10, type=int, help='How many seconds to gather statistics.')
parser.add_argument('--fail-empty', action='store_true',
help='If the API did not return any data, fail with UNKNOWN. Otherwise, assume that there was no traffic.')
args = parser.parse_args()
check_result = {}
interface_names = {}
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
# Map interface names to their internal names
interfaces_mapping = requests.get(f'https://{args.opnsense}/api/diagnostics/traffic/interface',
headers={'Accept': 'application/json'}, auth=(args.key, args.secret),
verify=False, timeout=10)
if interfaces_mapping.status_code != 200:
print(
f'UNKNOWN: unable to query OPNsense API for interface mappings: {interfaces_mapping.status_code}\n{interfaces_mapping.text}')
sys.exit(nagios.UNKNOWN)
interfaces_mapping = interfaces_mapping.json()['interfaces']
interfaces_to_check = set(args.interface.split(','))
for name, interface in interfaces_mapping.items():
if interface['name'] in interfaces_to_check:
interfaces_to_check.remove(interface['name'])
interface_names[interface['name']] = name
if not len(interface_names.keys()):
print(f'UNKNOWN: did not find any valid interface names! Double-check the name.')
sys.exit(nagios.UNKNOWN)
for name, interface in interface_names.items():
# Fetch the data
# TODO: account for network delays for the check duration
traffic_data = []
for _ in range(args.duration):
response = requests.get(f'https://{args.opnsense}/api/diagnostics/traffic/top/{interface}',
headers={'Accept': 'application/json'}, auth=(args.key, args.secret), verify=False,
timeout=10)
if response.status_code != 200:
print(f'UNKNOWN: unable to query OPNsense API for {interface}: {response.status_code}\n{response.text}')
sys.exit(nagios.UNKNOWN)
for item in response.json().get(interface, {}).get('records', False):
if item['address'] == args.host:
traffic_data.append(item)
time.sleep(1)
if not len(traffic_data) and args.fail_empty:
print('UNKNOWN: Interface or host not found in OPNsense API response. Raw response:')
print(traffic_data)
sys.exit(nagios.UNKNOWN)
elif not len(traffic_data):
check_result[name] = {
'rate_in': 0,
'rate_out': 0,
'cumulative_in': 0,
'cumulative_out': 0,
'connections': 0
}
else:
try:
check_result[name] = {
'rate_in': np.average([x['rate_bits_in'] for x in traffic_data]),
'rate_out': np.average([x['rate_bits_out'] for x in traffic_data]),
'cumulative_in': np.average([x['cumulative_bytes_in'] for x in traffic_data]),
'cumulative_out': np.average([x['cumulative_bytes_out'] for x in traffic_data]),
'connections': int(np.average([len(x['details']) for x in traffic_data]))
}
except Exception as e:
print(f'UNKNOWN: Failed to parse traffic data: "{e}"')
print(traceback.format_exc())
print('')
print('Raw data:')
print(traffic_data)
sys.exit(nagios.UNKNOWN)
# TODO: figure out status
print('OK: no metrics defined.')
warn_value = 0
crit_value = 0
exit_code = nagios.OK
critical = []
warn = []
ok = []
perf_data = []
for name, data in check_result.items():
# TODO: figure out status
if -1 >= crit_value:
critical.append(name)
status = '[CRITICAL]'
exit_code = nagios.CRITICAL
elif -1 >= warn_value:
warn.append(name)
status = '[WARNING]'
exit_code = nagios.WARNING
else:
ok.append(name)
status = '[OK]'
perf_data.append(f'\'{name}RateIn\'={int(data["rate_in"])}B;{warn_value};{crit_value};0;')
perf_data.append(f'\'{name}RateOut\'={int(data["rate_out"])}B;{warn_value};{crit_value};0;')
perf_data.append(f'\'{name}CumulativeIn\'={int(data["cumulative_in"])}B;{warn_value};{crit_value};0;')
perf_data.append(f'\'{name}CumulativeOut\'={int(data["cumulative_out"])}B;{warn_value};{crit_value};0;')
perf_data.append(f'\'{name}Connections\'={int(data["connections"])}B;{warn_value};{crit_value};0;')
output_table = [
('Host', 'Interface', 'Rate In', 'Rate Out', 'Cumulative In', 'Cumulative Out', 'Connections', 'Status'),
(args.host, name, filesize(data['rate_in']), filesize(data['rate_out']), filesize(data['cumulative_in']),
filesize(data['cumulative_out']), data['connections'], status)
]
print(list_to_markdown_table(output_table, align='left', seperator='!', borders=False))
print(f'| {" ".join(perf_data)}')
sys.exit(exit_code)
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f'UNKNOWN: exception "{e}"')
print(traceback.format_exc())
sys.exit(nagios.UNKNOWN)