#!/usr/bin/env python3 import argparse import re import sys import time import traceback import psutil import checker.nagios as nagios from checker import list_to_markdown_table, print_icinga2_check_status from checker.linuxfabric.base import get_state parser = argparse.ArgumentParser(description='Check network interface bandwidth utilization.') parser.add_argument('--bandwidth', type=float, default=0, help='Bandwidth speed in Mbps. Used to calculate percentage. Default is 0 which disables warning and critical levels.') parser.add_argument('--critical', type=int, default=75, help='Critical if percent of bandwidth usage is greater than or equal to this.') parser.add_argument('--warn', type=int, default=50, help='Warning if percent of bandwidth usage is greater than or equal to this.') parser.add_argument('--max', type=int, default=None, help='Set the max value the bandwidth can be. Useful for graphs and whatever.') parser.add_argument('--ignore', nargs='*', default=['lo'], help='Interface names to ignore, separated by a space. Default: lo') parser.add_argument('--ignore-re', default=None, help='Regex matching interface names to ignore.') args = parser.parse_args() # Icinga2 will merge the args to one string if len(args.ignore) == 1: args.ignore = args.ignore[0].split(' ') if args.ignore_re: ignore_re = re.compile(args.ignore_re) else: ignore_re = None def get_interface_data(interface: str, data: list): for y in data: if y[0] == interface: return y def get_network_traffic(interface): net_io = psutil.net_io_counters(pernic=True) if interface in net_io: return net_io[interface] else: raise ValueError(f"Interface '{interface}' not found") def calculate_network_traffic(interface, interval=1): initial_traffic = get_network_traffic(interface) start_time = time.perf_counter() # Should be more accurate that time.sleep() while True: current_time = time.perf_counter() elapsed_time = current_time - start_time if elapsed_time >= interval: break final_traffic = get_network_traffic(interface) sent_bytes = final_traffic.bytes_sent - initial_traffic.bytes_sent recv_bytes = final_traffic.bytes_recv - initial_traffic.bytes_recv sent_speed = sent_bytes / elapsed_time recv_speed = recv_bytes / elapsed_time # Convert bytes per second to megabits per second sent_speed_mbps = sent_speed * 8 / (1024 * 1024) recv_speed_mbps = recv_speed * 8 / (1024 * 1024) return sent_speed_mbps, recv_speed_mbps def main(): data = [] warn_value = (args.bandwidth * args.warn / 100) if args.bandwidth else 0 crit_value = (args.bandwidth * args.critical / 100) if args.bandwidth else 0 # Get network interface statistics net_io_counters = psutil.net_io_counters(pernic=True) # Calculate bandwidth utilization for each interface for interface, stats in net_io_counters.items(): if interface in args.ignore or (ignore_re and ignore_re.search(interface)): continue sent_speed, recv_speed = calculate_network_traffic(interface) bandwidth_utilization = sent_speed + recv_speed data.append([interface, sent_speed, recv_speed, bandwidth_utilization, 'none']) exit_code = nagios.OK critical = [] warn = [] ok = [] perfdata = {} for i in range(len(data)): interface = data[i][0] bandwidth_utilization = data[i][3] state_code = get_state(bandwidth_utilization, warn_value, crit_value, 'ge') if state_code == nagios.STATE_CRIT: critical.append(interface) state = 'critical' exit_code = max(exit_code, nagios.CRITICAL) elif state_code == nagios.STATE_WARN: warn.append(interface) state = 'warning' exit_code = max(exit_code, nagios.WARNING) else: ok.append(interface) state = 'ok' data[i][4] = f'[{state.upper()}]' perfdata.update({ interface: { 'value': round(bandwidth_utilization, 2), 'warn': warn_value, 'crit': crit_value, 'min': 0 if args.max else None, 'unit': 'Mb' } }) if exit_code == nagios.CRITICAL: listed_interfaces = [*critical, *warn] elif exit_code == nagios.WARNING: listed_interfaces = warn if exit_code != nagios.STATE_OK: listed_glances = [] for interface in listed_interfaces: listed_glances.append(f'{interface}: {round(get_interface_data(interface, data)[3], 2)} Mbps') glance_data = ", ".join(listed_glances) else: glance_data = 'all interfaces are ok' data = [(x[0], f'{round(x[3], 2)} Mbps', x[4]) for x in data] data.insert(0, ('Interface', 'Bandwidth', 'State')) print_icinga2_check_status( f'{glance_data}\n{list_to_markdown_table(data, align="left", seperator="!", borders=False)}', exit_code, perfdata) sys.exit(exit_code) if __name__ == "__main__": try: main() except Exception as e: print(f'UNKNOWN: exception "{e}"') print(traceback.format_exc()) sys.exit(nagios.UNKNOWN)