icinga2-checks/check_opnsense_traffic_for_...

131 lines
5.5 KiB
Python
Raw Normal View History

2023-04-21 23:54:20 -06:00
#!/usr/bin/env python3
import argparse
import sys
import time
import traceback
import hurry
import numpy as np
import requests
from hurry.filesize import size
from urllib3.exceptions import InsecureRequestWarning
import checker.nagios as nagios
from checker.markdown import list_to_markdown_table
def filesize(bytes: int, spaces: bool = True):
x = size(bytes, system=hurry.filesize.alternative)
if spaces:
return x
else:
return x.replace(' ', '')
def main():
parser = argparse.ArgumentParser(description='Check OPNsense network traffic for a host.')
parser.add_argument('--opnsense', required=True, help='OPNsense hostname or IP address.')
parser.add_argument('--key', required=True, help='OPNsense API key.')
parser.add_argument('--secret', required=True, help='OPNsense API secret.')
parser.add_argument('--interface', required=True, help='Interface to check (e.g., lan). Can be something like "lan,wan"')
parser.add_argument('--host', required=True, help='IP of the host to check.')
parser.add_argument('--duration', default=10, type=int, help='How many seconds to gather statistics.')
args = parser.parse_args()
check_result = {}
interface_names = {}
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
# Map interface names to their internal names
interfaces_mapping = requests.get(f'https://{args.opnsense}/api/diagnostics/traffic/interface',
headers={'Accept': 'application/json'}, auth=(args.key, args.secret), verify=False)
if interfaces_mapping.status_code != 200:
print(f'UNKNOWN: unable to query OPNsense API for interface mappings: {interfaces_mapping.status_code}\n{interfaces_mapping.text}')
sys.exit(nagios.UNKNOWN)
interfaces_mapping = interfaces_mapping.json()['interfaces']
interfaces_to_check = set(args.interface.split(','))
for name, interface in interfaces_mapping.items():
if interface['name'] in interfaces_to_check:
interfaces_to_check.remove(interface['name'])
interface_names[interface['name']] = name
if not len(interface_names.keys()):
print(f'UNKNOWN: did not find any valid interface names! Double-check the name.')
sys.exit(nagios.UNKNOWN)
for name, interface in interface_names.items():
# Fetch the data
# TODO: account for network delays for the check duration
traffic_data = []
for _ in range(args.duration):
response = requests.get(f'https://{args.opnsense}/api/diagnostics/traffic/top/{interface}',
headers={'Accept': 'application/json'}, auth=(args.key, args.secret), verify=False)
if response.status_code != 200:
print(f'UNKNOWN: unable to query OPNsense API for {interface}: {response.status_code}\n{response.text}')
sys.exit(nagios.UNKNOWN)
for item in response.json().get(interface, {}).get('records', False):
if item['address'] == args.host:
traffic_data.append(item)
time.sleep(1)
if not len(traffic_data):
print(f'UNKNOWN: Interface {args.interface} not found in OPNsense API response.')
sys.exit(nagios.UNKNOWN)
check_result[name] = {
'rate_in': np.average([x['rate_bits_in'] for x in traffic_data]),
'rate_out': np.average([x['rate_bits_out'] for x in traffic_data]),
'cumulative_in': np.average([x['cumulative_bytes_in'] for x in traffic_data]),
'cumulative_out': np.average([x['cumulative_bytes_out'] for x in traffic_data]),
'connections': int(np.average([len(x['details']) for x in traffic_data]))
}
# TODO: figure out status
print('OK: no metrics defined.')
warn_value = 0
crit_value = 0
exit_code = nagios.OK
critical = []
warn = []
ok = []
perf_data = []
output_table = [('Interface', 'Rate In', 'Rate Out', 'Cumulative In', 'Cumulative Out', 'Connections', 'Status')]
for name, data in check_result.items():
# TODO: figure out status
if -1 >= crit_value:
critical.append(name)
status = '[CRITICAL]'
exit_code = nagios.CRITICAL
elif -1 >= warn_value:
warn.append(name)
status = '[WARNING]'
exit_code = nagios.WARNING
else:
ok.append(name)
status = '[OK]'
perf_data.append(f'{name}-rate_in={filesize(data["rate_in"], spaces=False)};{warn_value};{crit_value};')
perf_data.append(f'{name}-rate_out={filesize(data["rate_out"], spaces=False)};{warn_value};{crit_value};')
perf_data.append(f'{name}-cumulative_in={filesize(data["cumulative_in"], spaces=False)};{warn_value};{crit_value};')
perf_data.append(f'{name}-cumulative_out={filesize(data["cumulative_out"], spaces=False)};{warn_value};{crit_value};')
perf_data.append(f'{name}-connections={data["connections"]};{warn_value};{crit_value};')
output_table.append((name, filesize(data['rate_in']), filesize(data['rate_out']), filesize(data['cumulative_in']), filesize(data['cumulative_out']),
data['connections'], status))
print(list_to_markdown_table(output_table, align='left', seperator='!', borders=False))
print(f' |{" ".join(perf_data)}')
sys.exit(exit_code)
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f'UNKNOWN: exception "{e}"')
print(traceback.format_exc())
sys.exit(nagios.UNKNOWN)