#!/usr/bin/env python3 import argparse import subprocess import sys import traceback from pathlib import Path from checker import print_icinga2_check_status, nagios from checker.linuxfabric.base import get_state from checker.types import try_float UPSC_PATH = '/usr/bin/upsc' UNWANTED_STATS = [ 'ups.mfr', 'ups.productid', 'device.serial', 'ups.vendorid', 'device.type', 'battery.type', 'ups.serial', 'device.mfr', 'battery.mfr.date' ] UNWANTED_STATS_STARTSWITH = [ 'ups.beeper.', 'ups.delay.', 'ups.timer.', ] STATS_LEVELS = [ 'battery.charge.low', 'battery.charge.warning', 'input.transfer.high', 'input.transfer.low', 'input.voltage.nominal', 'ups.realpower.nominal', 'battery.runtime.low', 'battery.voltage.nominal', ] VALUE_COMPARISONS = { 'load': 'ge', 'battery_charge': 'le', 'battery_runtime': 'le', 'battery_voltage': 'le', 'input_voltage': 'le', 'output_voltage': 'le' } VALUE_UNITS = { 'load': '%', 'battery_charge': '%', 'battery_runtime': 's', 'battery_voltage': 'V', 'input_voltage': 'V', 'output_voltage': 'V' } def parse_ups_status(status: str): # https://networkupstools.org/docs/developer-guide.chunked/new-drivers.html#_status_data status = status.lower() if status == 'ol': return 'on line', nagios.STATE_OK elif status == 'ob': return 'on battery', nagios.STATE_CRIT elif status == 'lb': return 'low battery', nagios.STATE_WARN elif status == 'hb': return 'high battery', nagios.STATE_CRIT elif status == 'rb': return 'replace battery', nagios.STATE_WARN elif status == 'chrg': return 'battery charging', nagios.STATE_OK elif status == 'dischrg': # inverter is providing load power return 'battery discharging', nagios.STATE_WARN elif status == 'bypass': return 'battery bypass', nagios.STATE_WARN elif status == 'cal': return 'calibrating', nagios.STATE_WARN elif status == 'off': return 'off', nagios.STATE_CRIT elif status == 'trim': return 'trimming incoming voltage', nagios.STATE_WARN elif status == 'boost': return 'boosting incoming voltage', nagios.STATE_WARN elif status == 'fsd': return 'forced shutdown', nagios.STATE_CRIT else: return 'unknown', nagios.STATE_UNKNOWN def calc_voltage_wiggle(percent: float, nominal_voltage): deviation = nominal_voltage * percent return nominal_voltage - deviation def parse_upsc(output: str): return [y for y in output.split('\n') if not y.startswith('Init SSL without certificate database') and y != ''] def main(args): if not Path('/usr/bin/upsc').is_file(): print_icinga2_check_status(f'Could not find "{UPSC_PATH}". Make sure "nut-client" is installed.', nagios.STATE_UNKNOWN) sys.exit(nagios.STATE_UNKNOWN) try: cmd = subprocess.check_output(f'{UPSC_PATH} {args.path} > /dev/stdout 2> /dev/stdout', shell=True) ups_stats = {x[0]: x[1] for x in [y.split(': ') for y in parse_upsc(cmd.decode())]} except Exception as e: if isinstance(e, subprocess.CalledProcessError): lines = parse_upsc(e.output.decode()) if len(lines) and 'data stale' in lines[0].lower(): print_icinga2_check_status(f'Failed to get UPS status: data stale', nagios.STATE_CRIT) sys.exit(nagios.STATE_CRIT) else: print_icinga2_check_status(f'Failed to get UPS status: {output}', nagios.STATE_UNKNOWN) print_icinga2_check_status(f'Failed to get UPS status: {e}', nagios.STATE_UNKNOWN) sys.exit(nagios.STATE_UNKNOWN) # Remove unwanted stats. for k, v in ups_stats.copy().items(): if k.startswith('driver.'): del ups_stats[k] for x in UNWANTED_STATS: if ups_stats.get(x): del ups_stats[x] for x in UNWANTED_STATS_STARTSWITH: for k, v in ups_stats.copy().items(): if k.startswith(x): del ups_stats[k] # Grab the levels from the stats. levels = {} for level in STATS_LEVELS: for k, v in ups_stats.copy().items(): if k == level: levels[k] = try_float(v) del ups_stats[k] # Grab the test results. ups_test_result = None if ups_stats.get('ups.test.result'): ups_test_result = ups_stats['ups.test.result'] del ups_stats['ups.test.result'] # Grab the UPS status. ups_status = ups_stats['ups.status'] del ups_stats['ups.status'] ups_status = ups_status.split(' ') # Grab the UPS model ups_model = None if ups_stats.get('device.model'): ups_model = ups_stats['device.model'] del ups_stats['device.model'] if ups_stats.get('ups.model'): ups_model = ups_stats['ups.model'] del ups_stats['ups.model'] # Easier to read. if ups_stats.get('ups.load'): ups_stats['load'] = ups_stats['ups.load'] del ups_stats['ups.load'] # Load the perfdata. perf_data = {} for k, v in ups_stats.items(): name = k.replace('.', '_') perf_data[name] = {'value': try_float(v), 'warn': None, 'crit': None, 'min': 0, 'unit': VALUE_UNITS.get(name)} # Set the perfdata values based on the levels. if perf_data.get('battery_charge'): perf_data['battery_charge']['warn'] = levels['battery.charge.warning'] perf_data['battery_charge']['crit'] = levels['battery.charge.low'] if perf_data.get('battery_runtime') and levels.get('battery.runtime.low'): perf_data['battery_runtime']['warn'] = args.runtime_warn perf_data['battery_runtime']['crit'] = levels.get('battery.runtime.low', 0) # Set wiggle values wiggle_warn = args.wiggle_crit * 0.01 wiggle_crit = args.wiggle_crit * 0.01 if perf_data.get('input_voltage') and levels.get('input.voltage.nominal'): nominal = levels.get('input.voltage.nominal') if nominal: perf_data['input_voltage']['warn'] = calc_voltage_wiggle(wiggle_warn, nominal) perf_data['input_voltage']['crit'] = calc_voltage_wiggle(wiggle_crit, nominal) if perf_data.get('battery_voltage') and levels.get('battery.voltage.nominal'): nominal = levels.get('battery.voltage.nominal') if nominal: perf_data['battery_voltage']['warn'] = calc_voltage_wiggle(wiggle_warn, nominal) perf_data['battery_voltage']['crit'] = calc_voltage_wiggle(wiggle_crit, nominal) output_nominal = levels.get('input.voltage.nominal') if output_nominal: perf_data['output_voltage']['warn'] = calc_voltage_wiggle(wiggle_warn, output_nominal) perf_data['output_voltage']['crit'] = calc_voltage_wiggle(wiggle_crit, output_nominal) # Set the perfdata values based on the input args. perf_data['load']['warn'] = args.load_warn perf_data['load']['crit'] = args.load_crit # Determine our exit code based on the perfdata. exit_code = nagios.STATE_OK exit_msg = [] for metric, value in perf_data.items(): comparison = VALUE_COMPARISONS.get(metric, 'le') state = get_state(value['value'], value['warn'], value['crit'], comparison) if state != nagios.STATE_OK: if metric == 'battery_runtime' and args.ignore_bad_runtime: continue v_pretty = value["value"] if str(v_pretty).endswith('.0'): v_pretty = int(value["value"]) name = ' '.join(metric.split('_')) msg = f'{name} is {v_pretty}' exit_msg.append(msg) exit_code = max(exit_code, state) # Determine our exit code based on the self-test results. if not args.ignore_bad_test and ups_test_result.lower() != 'done and passed' and ups_test_result.lower() != 'no test initiated': exit_code = nagios.STATE_CRIT exit_msg.insert(0, f'test failed: "{ups_test_result}"') # Determine our exit code based on the UPS status. status_msg = [] for status in ups_status: msg, code = parse_ups_status(status) if code != nagios.STATE_OK: status_msg.append(msg) exit_code = max(exit_code, code) status_text = ', '.join(status_msg) if not len(status_msg): runtime = round(perf_data["battery_runtime"]["value"] / 60, 1) if str(runtime).endswith('.0'): runtime = str(runtime).strip('.0') status_text = f'{int(perf_data["battery_charge"]["value"])}% charge and has {runtime} minutes of runtime' else: status_text = f'status: {status_text}' text_result = status_text + '. ' + ', '.join(exit_msg).capitalize() print_icinga2_check_status(text_result, exit_code, perf_data) sys.exit(exit_code) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Check NUT.") parser.add_argument("--path", required=True, help="Path to the UPS to check. Example: `ups_name@localhost`") parser.add_argument("--load-warn", default=75, type=int, help="Load warning level.") parser.add_argument("--load-crit", default=85, type=int, help="Load critical level.") parser.add_argument("--runtime-warn", default=None, type=int, help="Manually set the battery runtime warning level. Default: disabled") parser.add_argument("--wiggle-warn", default=5, type=int, help="Value to use to determine warning level if a voltage changes this percentage from the nominal input.") parser.add_argument("--wiggle-crit", default=10, type=int, help="Value to use to determine critical level if a voltage changes this percentage from the nominal input.") parser.add_argument("--ignore-bad-runtime", action='store_true', help="Ignore the battery runtime value. Useful when you know the battery is getting old.") parser.add_argument("--ignore-bad-test", action='store_true', help="Ignore bad test results.") args = parser.parse_args() try: main(args) except Exception as e: print_icinga2_check_status(f'exception "{e}" \n {traceback.format_exc()}', nagios.STATE_UNKNOWN) sys.exit(nagios.STATE_UNKNOWN)