diff --git a/check_elastiflow_traffic_for_host.py b/check_elastiflow_traffic_for_host.py new file mode 100755 index 0000000..31d154c --- /dev/null +++ b/check_elastiflow_traffic_for_host.py @@ -0,0 +1,53 @@ +import json + +import requests +from requests.auth import HTTPBasicAuth + +from checker.units import filesize + +es_url = 'http://xxxxx:9200/elastiflow-flow-ecs-*/_search' + +query = { + "query": { + "bool": { + "must": [ + { + "range": { + "@timestamp": { + "gte": "now-5m", + "lte": "now" + } + } + }, + { + "term": { + "client.ip": "10.0.0.9" + } + } + ] + } + }, + "aggs": { + "total_traffic": { + "sum": { + "field": "network.bytes" + } + } + } +} + +# Headers +headers = {'Content-Type': 'application/json'} + +username = 'elastic' +password = 'xxx' + +response = requests.post(es_url, headers=headers, data=json.dumps(query), auth=HTTPBasicAuth(username, password)) +data = response.json() +total_bytes = 0 + +for hit in data['hits']['hits']: + total_bytes += hit['_source']['network.bytes'] + +total_bytes_h = filesize(total_bytes) +print(f'Total bytes: {total_bytes_h}') diff --git a/check_opnsense_traffic_for_host_watcher.py b/check_opnsense_traffic_for_host_watcher.py new file mode 100755 index 0000000..182e207 --- /dev/null +++ b/check_opnsense_traffic_for_host_watcher.py @@ -0,0 +1,153 @@ +#!/usr/bin/env python3 +import argparse +import sys +import traceback +from ipaddress import ip_network + +import requests +from urllib3.exceptions import InsecureRequestWarning + +import checker.nagios as nagios +from checker import print_icinga2_check_status +from checker.http import fetch_with_retry +from checker.linuxfabric.base import get_state +from checker.markdown import list_to_markdown_table +from checker.units import filesize + + +def is_internet_traffic(ip): + private_networks = [ + ip_network("10.0.0.0/8"), + ip_network("172.16.0.0/12"), + ip_network("192.168.0.0/16"), + ] + return not any(ip in network for network in private_networks) + + +def main(): + parser = argparse.ArgumentParser(description='Check OPNsense network traffic for a host.') + parser.add_argument('--opnsense', required=True, help='OPNsense hostname or IP address.') + parser.add_argument('--host', required=True, help='Address of the host to check.') + parser.add_argument('--duration', default=60, type=int, help='How many seconds to gather statistics.') + parser.add_argument('--fail-empty', action='store_true', help='If the API did not return any data, fail with UNKNOWN. Otherwise, assume that there was no traffic.') + parser.add_argument('--bandwidth', type=float, required=True, help='Bandwidth speed in Mbps. Used to calculate percentage.') + parser.add_argument('--bandwidth-crit', type=int, default=75, help='Critical if percent of bandwidth usage is greater than or equal to this.') + parser.add_argument('--bandwidth-warn', type=int, default=50, help='Warning if percent of bandwidth usage is greater than or equal to this.') + parser.add_argument('--conn-crit', type=int, default=-1, help='Set critical level for number of connections. Default: -1 (disabled).') + parser.add_argument('--conn-warn', type=int, default=-1, help='Set warning level for number of connections. Default: -1 (disabled).') + parser.add_argument('--timeout', type=int, default=10, help='Timeout in seconds for the HTTP requests to OPNsense. Default: 10.') + args = parser.parse_args() + + check_result = {} + + requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) + + response = fetch_with_retry(f'{args.opnsense}/traffic/{args.host}?seconds={args.duration}', + headers={'Accept': 'application/json'}, verify=False, + timeout=args.timeout) + traffic_data = response.json()['data'] + + if not len(traffic_data) and args.fail_empty: + print_icinga2_check_status(f'interface or host not found in OPNsense API response. Raw response:\n{traffic_data}', nagios.STATE_UNKNOWN) + sys.exit(nagios.UNKNOWN) + elif not len(traffic_data): + # There was no traffic. + check_result = { + 'rate_in': 0, + 'rate_out': 0, + 'cumulative_in': 0, + 'cumulative_out': 0, + 'connections': 0 + } + else: + try: + check_result = { + 'rate_in': traffic_data['max_rate_in'], + 'rate_out': traffic_data['max_rate_out'], + 'cumulative_in': traffic_data['bytes_in'], + 'cumulative_out': traffic_data['bytes_out'], + 'connections': traffic_data['connections'] + } + except Exception as e: + print_icinga2_check_status(f'failed to parse traffic data: {e}\n{traceback.format_exc()}\n{traffic_data}', nagios.STATE_UNKNOWN) + sys.exit(nagios.UNKNOWN) + + warn_b_value = int((args.bandwidth * args.bandwidth_warn / 100) * 1e+6) + crit_b_value = int((args.bandwidth * args.bandwidth_crit / 100) * 1e+6) + conn_warn = args.conn_warn if args.conn_warn > -1 else None + conn_crit = args.conn_crit if args.conn_crit > -1 else None + + exit_code = nagios.OK + critical = [] + warn = [] + ok = [] + perf_data = {} + + output_table = [ + ('Max Rate In', 'Max Rate Out', 'Cumulative In', 'Cumulative Out', 'Connections', 'Status') + ] + + def check(name, state, value, do_filesize=True): + # TODO: improve this its kinda messy + def do_value(v): + # TODO: make filesize() handle rate + return filesize(v) + 'ps' if do_filesize else v + + nonlocal exit_code + if state == nagios.STATE_CRIT: + critical.append((name, do_value(value))) + exit_code = max(nagios.STATE_CRIT, exit_code) + return nagios.STATE_CRIT + elif state == nagios.STATE_WARN: + warn.append((name, do_value(value))) + exit_code = max(nagios.STATE_WARN, exit_code) + return nagios.STATE_WARN + elif state == nagios.STATE_OK: + exit_code = max(nagios.STATE_OK, exit_code) + ok.append((name, do_value(value))) + return nagios.STATE_OK + + in_state = get_state(check_result['rate_in'], warn_b_value, crit_b_value, 'ge') + in_exit_code = check(f'rate_in', in_state, check_result['rate_in']) + + out_state = get_state(check_result['rate_out'], warn_b_value, crit_b_value, 'ge') + out_exit_code = check(f'rate_out', out_state, check_result['rate_out']) + + conn_state = get_state(check_result['connections'], conn_warn, conn_crit, 'ge') + conn_exit_code = check(f'connections', conn_state, check_result['connections'], do_filesize=False) + conn_exit_code = 0 + + interface_status_code = max(in_exit_code, out_exit_code, conn_exit_code) + if interface_status_code == nagios.STATE_OK: + interface_status = '[OK]' + elif interface_status_code == nagios.STATE_WARN: + interface_status = '[WARNING]' + elif interface_status_code == nagios.STATE_CRIT: + interface_status = '[CRITICAL]' + else: + interface_status = '[UNKNOWN]' + + perf_data[f'max_rate_in'] = {'value': int(check_result["rate_in"]), 'warn': warn_b_value, 'crit': crit_b_value, 'min': 0, 'unit': 'B'} + perf_data[f'max_rate_out'] = {'value': int(check_result["rate_out"]), 'warn': warn_b_value, 'crit': crit_b_value, 'min': 0, 'unit': 'B'} + perf_data[f'cumulative_in'] = {'value': int(check_result["cumulative_in"]), 'warn': warn_b_value, 'crit': crit_b_value, 'min': 0, 'unit': 'B'} + perf_data[f'cumulative_out'] = {'value': int(check_result["cumulative_out"]), 'warn': warn_b_value, 'crit': crit_b_value, 'min': 0, 'unit': 'B'} + perf_data[f'connections'] = {'value': int(check_result["connections"]), 'warn': conn_warn, 'crit': conn_crit, 'min': 0} + + output_table.append((filesize(check_result['rate_in']), filesize(check_result['rate_out']), + filesize(check_result['cumulative_in']), filesize(check_result['cumulative_out']), check_result['connections'], + interface_status)) + + text_result = ', '.join(f'{name}: {rate}' for name, rate in [*critical, *warn, *ok]) + if len(check_result) > 1: + text_result = text_result + '\n' + list_to_markdown_table(output_table, align='left', seperator='!', borders=False) + + print_icinga2_check_status(text_result, exit_code, perf_data) + sys.exit(exit_code) + + +if __name__ == "__main__": + try: + main() + except Exception as e: + print_icinga2_check_status(f'exception "{e}"\n{traceback.format_exc()}', nagios.STATE_UNKNOWN) + sys.exit(nagios.UNKNOWN) diff --git a/opnsense_traffic_watcher.py b/opnsense_traffic_watcher.py new file mode 100644 index 0000000..942950c --- /dev/null +++ b/opnsense_traffic_watcher.py @@ -0,0 +1,256 @@ +import argparse +import json +import os +import threading +import time +from typing import List + +import redis +import requests +from flask import jsonify, request, Flask +from flask_caching import Cache +from urllib3.exceptions import InsecureRequestWarning + +from checker.units import filesize + +requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) + +# pip install redis flask flask-caching + +MAX_POINTS_PER_IP = 1900 + +OPNSENSE_URL = os.environ.get('OPN_URL') +OPNSENSE_KEY = os.environ.get('OPN_KEY') +OPNSENSE_SECRET = os.environ.get('OPN_SECRET') + + +class TrafficEntry: + def __init__(self, interface: str, address: str, rate_bits_in: int, rate_bits_out: int, rate_bits: int, cumulative_bytes_in: int, cumulative_bytes_out: int, cumulative_bytes: int, connections: dict, timestamp: float): + self.interface = interface + self.address = address + self.rate_bits_in = rate_bits_in + self.rate_bits_out = rate_bits_out + self.rate_bits = rate_bits + self.cumulative_bytes_in = cumulative_bytes_in + self.cumulative_bytes_out = cumulative_bytes_out + self.cumulative_bytes = cumulative_bytes + self.connections = connections + self.timestamp = timestamp + + def to_json(self): + return self.__dict__ + + @classmethod + def from_json(cls, json_str): + data = json.loads(json_str) + return cls(**data) + + +class OpnsenseTraffic: + def __init__(self): + self.redis = redis.Redis(host='localhost', port=6379, db=1) + + def flush(self): + self.redis.flushdb() + + def add_entry(self, item: TrafficEntry): + # TODO: kick out the oldest item + + key = f"{item.interface}:{item.address}" + if self.redis.llen(key) >= MAX_POINTS_PER_IP: + self.redis.lpop(key) + + self.redis.rpush(key, json.dumps(item.to_json())) + + def get_address(self, input_address: str) -> List[TrafficEntry]: + keys = self.redis.keys(f"*:{input_address}") + data = [] + for key in keys: + entries = self.redis.lrange(key, 0, -1) + data.extend([TrafficEntry.from_json(entry.decode()) for entry in entries]) + return data + + def get_entries(self, input_address: str): + keys = self.redis.keys() + data = {} + for key in keys: + interface, address = key.decode().split(":") + if address != input_address: + continue + entries = self.redis.lrange(key, 0, -1) + if interface not in data: + data[interface] = {} + data[interface][address] = [TrafficEntry.from_json(entry.decode()).to_json() for entry in entries] + return data + + def get_traffic(self, address: str, minus_seconds: int = 0, human: bool = False): + max_rate_in = 0 + max_rate_out = 0 + bytes_in = 0 + bytes_out = 0 + connections = 0 + + if minus_seconds == 0: + minus_sec_diff = 0 + else: + minus_sec_diff = int(time.time()) - minus_seconds + + address_traffic = self.get_address(address) + for entry in address_traffic: + if entry.timestamp >= minus_sec_diff: + max_rate_in = max(max_rate_in, entry.rate_bits_in) + max_rate_out = max(max_rate_out, entry.rate_bits_out) + bytes_in += entry.cumulative_bytes_in + bytes_out += entry.cumulative_bytes_out + connections += len(entry.connections) + + if human: + return filesize(max_rate_in), filesize(max_rate_out), filesize(bytes_in), filesize(bytes_out), connections + else: + return max_rate_in, max_rate_out, bytes_in, bytes_out, connections + + +def get_interfaces(): + r = redis.Redis(host='localhost', port=6379, db=2) + try: + return json.loads(r.get('interfaces')) + except Exception as e: + return [] + + +def get_interface_names(): + r = redis.Redis(host='localhost', port=6379, db=2) + # Map interface names to their internal names + while True: + interfaces_mapping_response = requests.get(f'{OPNSENSE_URL}/api/diagnostics/traffic/interface', + headers={'Accept': 'application/json'}, auth=(OPNSENSE_KEY, OPNSENSE_SECRET), + verify=False) + interfaces_mapping_response.raise_for_status() + + interfaces = list(interfaces_mapping_response.json()['interfaces'].keys()) + if 'interface' in interfaces: + interfaces.remove('interface') + + # Store the interfaces in Redis + r.set('interfaces', json.dumps(interfaces)) + + time.sleep(60) + + +def background_thread(): + traffic_data = OpnsenseTraffic() + traffic_data.flush() + while True: + start_time = time.time() + interface_req = ','.join(get_interfaces()) + response = requests.get(f'{OPNSENSE_URL}/api/diagnostics/traffic/top/{interface_req}', + headers={'Accept': 'application/json'}, auth=(OPNSENSE_KEY, OPNSENSE_SECRET), verify=False) + response.raise_for_status() + timestamp = time.time() + + for interface, data in response.json().items(): + for item in data.get('records'): + traffic_data.add_entry( + TrafficEntry( + address=item['address'], + interface=interface, + rate_bits=item['rate_bits'], + rate_bits_in=item['rate_bits_in'], + rate_bits_out=item['rate_bits_out'], + cumulative_bytes=item['cumulative_bytes'], + cumulative_bytes_in=item['cumulative_bytes_in'], + cumulative_bytes_out=item['cumulative_bytes_out'], + connections=item['details'], + timestamp=timestamp + ) + ) + end_time = time.time() + api_request_time = end_time - start_time + adjusted_sleep_duration = max(1 - api_request_time, 0) + time.sleep(adjusted_sleep_duration) + + +flask_traffic_data = OpnsenseTraffic() +app = Flask(__name__) +cache = Cache(app, config={ + "CACHE_TYPE": "RedisCache", + "CACHE_REDIS_HOST": "127.0.0.1", + "port": 6379 +}) + + +@app.route('/traffic/
', methods=['GET']) +@app.route('/traffic/
/', methods=['GET']) +@cache.cached(timeout=10, query_string=True) +def get_traffic(address, interface=None): + minus_seconds = request.args.get('seconds', default=0, type=int) + human = request.args.get('human', default=False, type=bool) + max_rate_in, max_rate_out, bytes_in, bytes_out, connections = flask_traffic_data.get_traffic(address, minus_seconds, human) + entries = flask_traffic_data.get_entries(address) + num_entries = 0 + + if interface: + if interface not in entries.keys(): + return 'Interface not found for address', 404 + entries = entries[interface] + num_entries = len(entries) + return jsonify({ + interface: { + 'max_rate_in': max_rate_in, 'max_rate_out': max_rate_out, 'bytes_in': bytes_in, 'bytes_out': bytes_out, 'connections': connections + }, + 'entries': num_entries + }) + else: + for interface in entries: + num_entries += len(entries[interface][list(entries[interface].keys())[0]]) + return jsonify({ + 'data': { + 'max_rate_in': max_rate_in, 'max_rate_out': max_rate_out, 'bytes_in': bytes_in, 'bytes_out': bytes_out, 'connections': connections + }, + 'entries': num_entries + }) + + +@app.route('/data/', methods=['GET']) +@cache.cached(timeout=10) +def get_entries(ip): + entries = flask_traffic_data.get_entries(ip) + # Remove the IP address from the dict + new_entries = {} + for key, value in entries.items(): + new_entries[key] = [] + for sub_key, sub_value in value.items(): + new_entries[key].extend(sub_value) + return jsonify(new_entries) + + +@app.route('/interfaces', methods=['GET']) +@cache.cached(timeout=10) +def flask_get_interfaces(): + return jsonify(get_interfaces()) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--daemon', action='store_true', help='Run the background daemon.') + args = parser.parse_args() + + if args.daemon: + t1 = threading.Thread(target=get_interface_names, daemon=True) + t1.start() + + print('Fetching interface list... ', end='') + while not len(get_interfaces()): + time.sleep(2) + print('Done!') + + t2 = threading.Thread(target=background_thread, daemon=True) + t2.start() + + print('Threads started!') + + while True: + time.sleep(10000) + + else: + app.run(debug=True)