icinga2-checks/check_nut.py

262 lines
10 KiB
Python

#!/usr/bin/env python3
import argparse
import subprocess
import sys
import traceback
from pathlib import Path
from checker import print_icinga2_check_status, nagios
from checker.linuxfabric.base import get_state
from checker.types import try_float
UPSC_PATH = '/usr/bin/upsc'
UNWANTED_STATS = [
'ups.mfr',
'ups.productid',
'device.serial',
'ups.vendorid',
'device.type',
'battery.type',
'ups.serial',
'device.mfr',
'battery.mfr.date'
]
UNWANTED_STATS_STARTSWITH = [
'ups.beeper.',
'ups.delay.',
'ups.timer.',
]
STATS_LEVELS = [
'battery.charge.low',
'battery.charge.warning',
'input.transfer.high',
'input.transfer.low',
'input.voltage.nominal',
'ups.realpower.nominal',
'battery.runtime.low',
'battery.voltage.nominal',
]
VALUE_COMPARISONS = {
'load': 'ge',
'battery_charge': 'le',
'battery_runtime': 'le',
'battery_voltage': 'le',
'input_voltage': 'le',
'output_voltage': 'le'
}
VALUE_UNITS = {
'load': '%',
'battery_charge': '%',
'battery_runtime': 's',
'battery_voltage': 'V',
'input_voltage': 'V',
'output_voltage': 'V'
}
def parse_ups_status(status: str):
# https://networkupstools.org/docs/developer-guide.chunked/new-drivers.html#_status_data
status = status.lower()
if status == 'ol':
return 'on line', nagios.STATE_OK
elif status == 'ob':
return 'on battery', nagios.STATE_CRIT
elif status == 'lb':
return 'low battery', nagios.STATE_WARN
elif status == 'hb':
return 'high battery', nagios.STATE_CRIT
elif status == 'rb':
return 'replace battery', nagios.STATE_WARN
elif status == 'chrg':
return 'battery charging', nagios.STATE_OK
elif status == 'dischrg':
# inverter is providing load power
return 'battery discharging', nagios.STATE_WARN
elif status == 'bypass':
return 'battery bypass', nagios.STATE_WARN
elif status == 'cal':
return 'calibrating', nagios.STATE_WARN
elif status == 'off':
return 'off', nagios.STATE_CRIT
elif status == 'trim':
return 'trimming incoming voltage', nagios.STATE_WARN
elif status == 'boost':
return 'boosting incoming voltage', nagios.STATE_WARN
elif status == 'fsd':
return 'forced shutdown', nagios.STATE_CRIT
else:
return 'unknown', nagios.STATE_UNKNOWN
def calc_voltage_wiggle(percent: float, nominal_voltage):
deviation = nominal_voltage * percent
return nominal_voltage - deviation
def parse_upsc(output: str):
return [y for y in output.split('\n') if not y.startswith('Init SSL without certificate database') and y != '']
def main(args):
if not Path('/usr/bin/upsc').is_file():
print_icinga2_check_status(f'Could not find "{UPSC_PATH}". Make sure "nut-client" is installed.', nagios.STATE_UNKNOWN)
sys.exit(nagios.STATE_UNKNOWN)
try:
cmd = subprocess.check_output(f'{UPSC_PATH} {args.path} > /dev/stdout 2> /dev/stdout', shell=True)
ups_stats = {x[0]: x[1] for x in [y.split(': ') for y in parse_upsc(cmd.decode())]}
except Exception as e:
if isinstance(e, subprocess.CalledProcessError):
lines = parse_upsc(e.output.decode())
if len(lines) and 'data stale' in lines[0].lower():
print_icinga2_check_status(f'Failed to get UPS status: data stale', nagios.STATE_CRIT)
sys.exit(nagios.STATE_CRIT)
else:
print_icinga2_check_status(f'Failed to get UPS status: {e} - "{e.output.decode()}"', nagios.STATE_UNKNOWN)
else:
print_icinga2_check_status(f'Failed to get UPS status: {e}', nagios.STATE_UNKNOWN)
sys.exit(nagios.STATE_UNKNOWN)
# Remove unwanted stats.
for k, v in ups_stats.copy().items():
if k.startswith('driver.'):
del ups_stats[k]
for x in UNWANTED_STATS:
if ups_stats.get(x):
del ups_stats[x]
for x in UNWANTED_STATS_STARTSWITH:
for k, v in ups_stats.copy().items():
if k.startswith(x):
del ups_stats[k]
# Grab the levels from the stats.
levels = {}
for level in STATS_LEVELS:
for k, v in ups_stats.copy().items():
if k == level:
levels[k] = try_float(v)
del ups_stats[k]
# Grab the test results.
ups_test_result = None
if ups_stats.get('ups.test.result'):
ups_test_result = ups_stats['ups.test.result']
del ups_stats['ups.test.result']
# Grab the UPS status.
ups_status = ups_stats['ups.status']
del ups_stats['ups.status']
ups_status = ups_status.split(' ')
# Grab the UPS model
ups_model = None
if ups_stats.get('device.model'):
ups_model = ups_stats['device.model']
del ups_stats['device.model']
if ups_stats.get('ups.model'):
ups_model = ups_stats['ups.model']
del ups_stats['ups.model']
# Easier to read.
if ups_stats.get('ups.load'):
ups_stats['load'] = ups_stats['ups.load']
del ups_stats['ups.load']
# Load the perfdata.
perf_data = {}
for k, v in ups_stats.items():
name = k.replace('.', '_')
perf_data[name] = {'value': try_float(v), 'warn': None, 'crit': None, 'min': 0, 'unit': VALUE_UNITS.get(name)}
# Set the perfdata values based on the levels.
if perf_data.get('battery_charge'):
perf_data['battery_charge']['warn'] = levels['battery.charge.warning']
perf_data['battery_charge']['crit'] = levels['battery.charge.low']
if perf_data.get('battery_runtime') and levels.get('battery.runtime.low'):
perf_data['battery_runtime']['warn'] = args.runtime_warn
perf_data['battery_runtime']['crit'] = levels.get('battery.runtime.low', 0)
# Set wiggle values
wiggle_warn = args.wiggle_crit * 0.01
wiggle_crit = args.wiggle_crit * 0.01
if perf_data.get('input_voltage') and levels.get('input.voltage.nominal'):
nominal = levels.get('input.voltage.nominal')
if nominal:
perf_data['input_voltage']['warn'] = calc_voltage_wiggle(wiggle_warn, nominal)
perf_data['input_voltage']['crit'] = calc_voltage_wiggle(wiggle_crit, nominal)
if perf_data.get('battery_voltage') and levels.get('battery.voltage.nominal'):
nominal = levels.get('battery.voltage.nominal')
if nominal:
perf_data['battery_voltage']['warn'] = calc_voltage_wiggle(wiggle_warn, nominal)
perf_data['battery_voltage']['crit'] = calc_voltage_wiggle(wiggle_crit, nominal)
output_nominal = levels.get('input.voltage.nominal')
if output_nominal:
perf_data['output_voltage']['warn'] = calc_voltage_wiggle(wiggle_warn, output_nominal)
perf_data['output_voltage']['crit'] = calc_voltage_wiggle(wiggle_crit, output_nominal)
# Set the perfdata values based on the input args.
perf_data['load']['warn'] = args.load_warn
perf_data['load']['crit'] = args.load_crit
# Determine our exit code based on the perfdata.
exit_code = nagios.STATE_OK
exit_msg = []
for metric, value in perf_data.items():
comparison = VALUE_COMPARISONS.get(metric, 'le')
state = get_state(value['value'], value['warn'], value['crit'], comparison)
if state != nagios.STATE_OK:
if metric == 'battery_runtime' and args.ignore_bad_runtime:
continue
v_pretty = value["value"]
if str(v_pretty).endswith('.0'):
v_pretty = int(value["value"])
name = ' '.join(metric.split('_'))
msg = f'{name} is {v_pretty}'
exit_msg.append(msg)
exit_code = max(exit_code, state)
# Determine our exit code based on the self-test results.
if not args.ignore_bad_test and ups_test_result.lower() != 'done and passed' and ups_test_result.lower() != 'no test initiated':
exit_code = nagios.STATE_CRIT
exit_msg.insert(0, f'test failed: "{ups_test_result}"')
# Determine our exit code based on the UPS status.
status_msg = []
for status in ups_status:
msg, code = parse_ups_status(status)
if code != nagios.STATE_OK:
status_msg.append(msg)
exit_code = max(exit_code, code)
status_text = ', '.join(status_msg)
if not len(status_msg):
runtime = round(perf_data["battery_runtime"]["value"] / 60, 1)
if str(runtime).endswith('.0'):
runtime = str(runtime).strip('.0')
status_text = f'{int(perf_data["battery_charge"]["value"])}% charge and has {runtime} minutes of runtime'
else:
status_text = f'status: {status_text}'
text_result = status_text + '. ' + ', '.join(exit_msg).capitalize()
print_icinga2_check_status(text_result, exit_code, perf_data)
sys.exit(exit_code)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Check NUT.")
parser.add_argument("--path", required=True, help="Path to the UPS to check. Example: `ups_name@localhost`")
parser.add_argument("--load-warn", default=75, type=int, help="Load warning level.")
parser.add_argument("--load-crit", default=85, type=int, help="Load critical level.")
parser.add_argument("--runtime-warn", default=None, type=int, help="Manually set the battery runtime warning level. Default: disabled")
parser.add_argument("--wiggle-warn", default=5, type=int, help="Value to use to determine warning level if a voltage changes this percentage from the nominal input.")
parser.add_argument("--wiggle-crit", default=10, type=int, help="Value to use to determine critical level if a voltage changes this percentage from the nominal input.")
parser.add_argument("--ignore-bad-runtime", action='store_true', help="Ignore the battery runtime value. Useful when you know the battery is getting old.")
parser.add_argument("--ignore-bad-test", action='store_true', help="Ignore bad test results.")
args = parser.parse_args()
try:
main(args)
except Exception as e:
print_icinga2_check_status(f'exception "{e}" \n {traceback.format_exc()}', nagios.STATE_UNKNOWN)
sys.exit(nagios.STATE_UNKNOWN)