diff --git a/check_nut.py b/check_nut.py new file mode 100644 index 0000000..9da1eb6 --- /dev/null +++ b/check_nut.py @@ -0,0 +1,250 @@ +#!/usr/bin/env python3 +import argparse +import subprocess +import sys +import traceback +from pathlib import Path + +from checker import print_icinga2_check_status, nagios +from checker.linuxfabric.base import get_state +from checker.types import try_float + +UPSC_PATH = '/usr/bin/upsc' +UNWANTED_STATS = [ + 'ups.mfr', + 'ups.productid', + 'device.serial', + 'ups.vendorid', + 'device.type', + 'battery.type', + 'ups.serial', + 'device.mfr', + 'battery.mfr.date' +] +UNWANTED_STATS_STARTSWITH = [ + 'ups.beeper.', + 'ups.delay.', + 'ups.timer.', +] +STATS_LEVELS = [ + 'battery.charge.low', + 'battery.charge.warning', + 'input.transfer.high', + 'input.transfer.low', + 'input.voltage.nominal', + 'ups.realpower.nominal', + 'battery.runtime.low', + 'battery.voltage.nominal', +] +VALUE_COMPARISONS = { + 'load': 'ge', + 'battery_charge': 'le', + 'battery_runtime': 'le', + 'battery_voltage': 'le', + 'input_voltage': 'le', + 'output_voltage': 'le' +} + + +def parse_ups_status(status: str): + # https://networkupstools.org/docs/developer-guide.chunked/new-drivers.html#_status_data + status = status.lower() + if status == 'ol': + return 'on line', nagios.STATE_OK + elif status == 'ob': + return 'on battery', nagios.STATE_CRIT + elif status == 'lb': + return 'low battery', nagios.STATE_WARN + elif status == 'hb': + return 'high battery', nagios.STATE_CRIT + elif status == 'rb': + return 'replace battery', nagios.STATE_WARN + elif status == 'chrg': + return 'battery charging', nagios.STATE_OK + elif status == 'dischrg': + # inverter is providing load power + return 'battery discharging', nagios.STATE_WARN + elif status == 'bypass': + return 'battery bypass', nagios.STATE_WARN + elif status == 'cal': + return 'calibrating', nagios.STATE_WARN + elif status == 'off': + return 'off', nagios.STATE_CRIT + elif status == 'trim': + return 'trimming incoming voltage', nagios.STATE_WARN + elif status == 'boost': + return 'boosting incoming voltage', nagios.STATE_WARN + elif status == 'fsd': + return 'forced shutdown', nagios.STATE_CRIT + else: + return 'unknown', nagios.STATE_UNKNOWN + + +def calc_voltage_wiggle(percent: float, nominal_voltage): + deviation = nominal_voltage * percent + return nominal_voltage - deviation + + +def parse_upsc(output: str): + return [y for y in output.split('\n') if not y.startswith('Init SSL without certificate database') and y != ''] + + +def main(args): + if not Path('/usr/bin/upsc').is_file(): + print_icinga2_check_status(f'Could not find "{UPSC_PATH}". Make sure "nut-client" is installed.', nagios.STATE_UNKNOWN) + sys.exit(nagios.STATE_UNKNOWN) + + try: + cmd = subprocess.check_output(f'{UPSC_PATH} {args.path} > /dev/stdout 2> /dev/stdout', shell=True) + ups_stats = {x[0]: x[1] for x in [y.split(': ') for y in parse_upsc(cmd.decode())]} + except Exception as e: + if isinstance(e, subprocess.CalledProcessError): + lines = parse_upsc(e.output.decode()) + if len(lines) and 'data stale' in lines[0].lower(): + print_icinga2_check_status(f'Failed to get UPS status: data stale', nagios.STATE_CRIT) + sys.exit(nagios.STATE_CRIT) + print_icinga2_check_status(f'Failed to get UPS status: {e}', nagios.STATE_UNKNOWN) + sys.exit(nagios.STATE_UNKNOWN) + + # Remove unwanted stats. + for k, v in ups_stats.copy().items(): + if k.startswith('driver.'): + del ups_stats[k] + for x in UNWANTED_STATS: + if ups_stats.get(x): + del ups_stats[x] + for x in UNWANTED_STATS_STARTSWITH: + for k, v in ups_stats.copy().items(): + if k.startswith(x): + del ups_stats[k] + + # Grab the levels from the stats. + levels = {} + for level in STATS_LEVELS: + for k, v in ups_stats.copy().items(): + if k == level: + levels[k] = try_float(v) + del ups_stats[k] + + # Grab the test results. + ups_test_result = None + if ups_stats.get('ups.test.result'): + ups_test_result = ups_stats['ups.test.result'] + del ups_stats['ups.test.result'] + + # Grab the UPS status. + ups_status = ups_stats['ups.status'] + del ups_stats['ups.status'] + ups_status = ups_status.split(' ') + + # Grab the UPS model + ups_model = None + if ups_stats.get('device.model'): + ups_model = ups_stats['device.model'] + del ups_stats['device.model'] + if ups_stats.get('ups.model'): + ups_model = ups_stats['ups.model'] + del ups_stats['ups.model'] + + # Easier to read. + if ups_stats.get('ups.load'): + ups_stats['load'] = ups_stats['ups.load'] + del ups_stats['ups.load'] + + # Load the perfdata. + perf_data = {} + for k, v in ups_stats.items(): + name = k.replace('.', '_') + perf_data[name] = {'value': try_float(v), 'warn': None, 'crit': None, 'min': 0} + + # Set the perfdata values based on the levels. + if perf_data.get('battery_charge'): + perf_data['battery_charge']['warn'] = levels['battery.charge.warning'] + perf_data['battery_charge']['crit'] = levels['battery.charge.low'] + if perf_data.get('battery_runtime') and levels.get('battery.runtime.low'): + perf_data['battery_runtime']['warn'] = args.runtime_warn + perf_data['battery_runtime']['crit'] = levels.get('battery.runtime.low', 0) + + # Set wiggle values + wiggle_warn = args.wiggle_crit * 0.01 + wiggle_crit = args.wiggle_crit * 0.01 + if perf_data.get('input_voltage') and levels.get('input.voltage.nominal'): + nominal = levels.get('input.voltage.nominal') + if nominal: + perf_data['input_voltage']['warn'] = calc_voltage_wiggle(wiggle_warn, nominal) + perf_data['input_voltage']['crit'] = calc_voltage_wiggle(wiggle_crit, nominal) + if perf_data.get('battery_voltage') and levels.get('battery.voltage.nominal'): + nominal = levels.get('battery.voltage.nominal') + if nominal: + perf_data['battery_voltage']['warn'] = calc_voltage_wiggle(wiggle_warn, nominal) + perf_data['battery_voltage']['crit'] = calc_voltage_wiggle(wiggle_crit, nominal) + output_nominal = levels.get('input.voltage.nominal') + if output_nominal: + perf_data['output_voltage']['warn'] = calc_voltage_wiggle(wiggle_warn, output_nominal) + perf_data['output_voltage']['crit'] = calc_voltage_wiggle(wiggle_crit, output_nominal) + + # Set the perfdata values based on the input args. + perf_data['load']['warn'] = args.load_warn + perf_data['load']['crit'] = args.load_crit + + # Determine our exit code based on the perfdata. + exit_code = nagios.STATE_OK + exit_msg = [] + for metric, value in perf_data.items(): + comparison = VALUE_COMPARISONS.get(metric, 'le') + state = get_state(value['value'], value['warn'], value['crit'], comparison) + if state != nagios.STATE_OK: + if metric == 'battery_runtime' and args.ignore_bad_runtime: + continue + v_pretty = value["value"] + if str(v_pretty).endswith('.0'): + v_pretty = int(value["value"]) + name = ' '.join(metric.split('_')) + msg = f'{name} is {v_pretty}' + exit_msg.append(msg) + exit_code = max(exit_code, state) + + # Determine our exit code based on the self-test results. + if not args.ignore_bad_test and ups_test_result.lower() != 'done and passed' and ups_test_result.lower() != 'no test initiated': + exit_code = nagios.STATE_CRIT + exit_msg.insert(0, f'test failed: "{ups_test_result}"') + + # Determine our exit code based on the UPS status. + status_msg = [] + for status in ups_status: + msg, code = parse_ups_status(status) + if code != nagios.STATE_OK: + status_msg.append(msg) + exit_code = max(exit_code, code) + status_text = ', '.join(status_msg) + + if not len(status_msg): + runtime = round(perf_data["battery_runtime"]["value"] / 60, 1) + if str(runtime).endswith('.0'): + runtime = str(runtime).strip('.0') + status_text = f'{int(perf_data["battery_charge"]["value"])}% charge and has {runtime} minutes of runtime' + else: + status_text = f'status: {status_text}' + + text_result = status_text + '. ' + ', '.join(exit_msg).capitalize() + print_icinga2_check_status(text_result, exit_code, perf_data) + sys.exit(exit_code) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Check NUT.") + parser.add_argument("--path", required=True, help="Path to the UPS to check. Example: `ups_name@localhost`") + parser.add_argument("--load-warn", default=75, type=int, help="Load warning level.") + parser.add_argument("--load-crit", default=85, type=int, help="Load critical level.") + parser.add_argument("--runtime-warn", default=None, type=int, help="Manually set the battery runtime warning level. Default: disabled") + parser.add_argument("--wiggle-warn", default=5, type=int, help="Value to use to determine warning level if a voltage changes this percentage from the nominal input.") + parser.add_argument("--wiggle-crit", default=10, type=int, help="Value to use to determine critical level if a voltage changes this percentage from the nominal input.") + parser.add_argument("--ignore-bad-runtime", action='store_true', help="Ignore the battery runtime value. Useful when you know the battery is getting old.") + parser.add_argument("--ignore-bad-test", action='store_true', help="Ignore bad test results.") + args = parser.parse_args() + + try: + main(args) + except Exception as e: + print_icinga2_check_status(f'exception "{e}" \n {traceback.format_exc()}', nagios.STATE_UNKNOWN) + sys.exit(nagios.STATE_UNKNOWN) diff --git a/checker/types.py b/checker/types.py new file mode 100644 index 0000000..3d31011 --- /dev/null +++ b/checker/types.py @@ -0,0 +1,11 @@ +def try_float(value: str) -> int | float | str: + try: + return float(value) + except: + pass + try: + return int(value) + except: + pass + raise ValueError(f"Could not convert {value} to float or int") + # return value