add check_nut
This commit is contained in:
parent
6d7174ad63
commit
7c559a689a
|
@ -0,0 +1,250 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
import argparse
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import traceback
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from checker import print_icinga2_check_status, nagios
|
||||||
|
from checker.linuxfabric.base import get_state
|
||||||
|
from checker.types import try_float
|
||||||
|
|
||||||
|
UPSC_PATH = '/usr/bin/upsc'
|
||||||
|
UNWANTED_STATS = [
|
||||||
|
'ups.mfr',
|
||||||
|
'ups.productid',
|
||||||
|
'device.serial',
|
||||||
|
'ups.vendorid',
|
||||||
|
'device.type',
|
||||||
|
'battery.type',
|
||||||
|
'ups.serial',
|
||||||
|
'device.mfr',
|
||||||
|
'battery.mfr.date'
|
||||||
|
]
|
||||||
|
UNWANTED_STATS_STARTSWITH = [
|
||||||
|
'ups.beeper.',
|
||||||
|
'ups.delay.',
|
||||||
|
'ups.timer.',
|
||||||
|
]
|
||||||
|
STATS_LEVELS = [
|
||||||
|
'battery.charge.low',
|
||||||
|
'battery.charge.warning',
|
||||||
|
'input.transfer.high',
|
||||||
|
'input.transfer.low',
|
||||||
|
'input.voltage.nominal',
|
||||||
|
'ups.realpower.nominal',
|
||||||
|
'battery.runtime.low',
|
||||||
|
'battery.voltage.nominal',
|
||||||
|
]
|
||||||
|
VALUE_COMPARISONS = {
|
||||||
|
'load': 'ge',
|
||||||
|
'battery_charge': 'le',
|
||||||
|
'battery_runtime': 'le',
|
||||||
|
'battery_voltage': 'le',
|
||||||
|
'input_voltage': 'le',
|
||||||
|
'output_voltage': 'le'
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def parse_ups_status(status: str):
|
||||||
|
# https://networkupstools.org/docs/developer-guide.chunked/new-drivers.html#_status_data
|
||||||
|
status = status.lower()
|
||||||
|
if status == 'ol':
|
||||||
|
return 'on line', nagios.STATE_OK
|
||||||
|
elif status == 'ob':
|
||||||
|
return 'on battery', nagios.STATE_CRIT
|
||||||
|
elif status == 'lb':
|
||||||
|
return 'low battery', nagios.STATE_WARN
|
||||||
|
elif status == 'hb':
|
||||||
|
return 'high battery', nagios.STATE_CRIT
|
||||||
|
elif status == 'rb':
|
||||||
|
return 'replace battery', nagios.STATE_WARN
|
||||||
|
elif status == 'chrg':
|
||||||
|
return 'battery charging', nagios.STATE_OK
|
||||||
|
elif status == 'dischrg':
|
||||||
|
# inverter is providing load power
|
||||||
|
return 'battery discharging', nagios.STATE_WARN
|
||||||
|
elif status == 'bypass':
|
||||||
|
return 'battery bypass', nagios.STATE_WARN
|
||||||
|
elif status == 'cal':
|
||||||
|
return 'calibrating', nagios.STATE_WARN
|
||||||
|
elif status == 'off':
|
||||||
|
return 'off', nagios.STATE_CRIT
|
||||||
|
elif status == 'trim':
|
||||||
|
return 'trimming incoming voltage', nagios.STATE_WARN
|
||||||
|
elif status == 'boost':
|
||||||
|
return 'boosting incoming voltage', nagios.STATE_WARN
|
||||||
|
elif status == 'fsd':
|
||||||
|
return 'forced shutdown', nagios.STATE_CRIT
|
||||||
|
else:
|
||||||
|
return 'unknown', nagios.STATE_UNKNOWN
|
||||||
|
|
||||||
|
|
||||||
|
def calc_voltage_wiggle(percent: float, nominal_voltage):
|
||||||
|
deviation = nominal_voltage * percent
|
||||||
|
return nominal_voltage - deviation
|
||||||
|
|
||||||
|
|
||||||
|
def parse_upsc(output: str):
|
||||||
|
return [y for y in output.split('\n') if not y.startswith('Init SSL without certificate database') and y != '']
|
||||||
|
|
||||||
|
|
||||||
|
def main(args):
|
||||||
|
if not Path('/usr/bin/upsc').is_file():
|
||||||
|
print_icinga2_check_status(f'Could not find "{UPSC_PATH}". Make sure "nut-client" is installed.', nagios.STATE_UNKNOWN)
|
||||||
|
sys.exit(nagios.STATE_UNKNOWN)
|
||||||
|
|
||||||
|
try:
|
||||||
|
cmd = subprocess.check_output(f'{UPSC_PATH} {args.path} > /dev/stdout 2> /dev/stdout', shell=True)
|
||||||
|
ups_stats = {x[0]: x[1] for x in [y.split(': ') for y in parse_upsc(cmd.decode())]}
|
||||||
|
except Exception as e:
|
||||||
|
if isinstance(e, subprocess.CalledProcessError):
|
||||||
|
lines = parse_upsc(e.output.decode())
|
||||||
|
if len(lines) and 'data stale' in lines[0].lower():
|
||||||
|
print_icinga2_check_status(f'Failed to get UPS status: data stale', nagios.STATE_CRIT)
|
||||||
|
sys.exit(nagios.STATE_CRIT)
|
||||||
|
print_icinga2_check_status(f'Failed to get UPS status: {e}', nagios.STATE_UNKNOWN)
|
||||||
|
sys.exit(nagios.STATE_UNKNOWN)
|
||||||
|
|
||||||
|
# Remove unwanted stats.
|
||||||
|
for k, v in ups_stats.copy().items():
|
||||||
|
if k.startswith('driver.'):
|
||||||
|
del ups_stats[k]
|
||||||
|
for x in UNWANTED_STATS:
|
||||||
|
if ups_stats.get(x):
|
||||||
|
del ups_stats[x]
|
||||||
|
for x in UNWANTED_STATS_STARTSWITH:
|
||||||
|
for k, v in ups_stats.copy().items():
|
||||||
|
if k.startswith(x):
|
||||||
|
del ups_stats[k]
|
||||||
|
|
||||||
|
# Grab the levels from the stats.
|
||||||
|
levels = {}
|
||||||
|
for level in STATS_LEVELS:
|
||||||
|
for k, v in ups_stats.copy().items():
|
||||||
|
if k == level:
|
||||||
|
levels[k] = try_float(v)
|
||||||
|
del ups_stats[k]
|
||||||
|
|
||||||
|
# Grab the test results.
|
||||||
|
ups_test_result = None
|
||||||
|
if ups_stats.get('ups.test.result'):
|
||||||
|
ups_test_result = ups_stats['ups.test.result']
|
||||||
|
del ups_stats['ups.test.result']
|
||||||
|
|
||||||
|
# Grab the UPS status.
|
||||||
|
ups_status = ups_stats['ups.status']
|
||||||
|
del ups_stats['ups.status']
|
||||||
|
ups_status = ups_status.split(' ')
|
||||||
|
|
||||||
|
# Grab the UPS model
|
||||||
|
ups_model = None
|
||||||
|
if ups_stats.get('device.model'):
|
||||||
|
ups_model = ups_stats['device.model']
|
||||||
|
del ups_stats['device.model']
|
||||||
|
if ups_stats.get('ups.model'):
|
||||||
|
ups_model = ups_stats['ups.model']
|
||||||
|
del ups_stats['ups.model']
|
||||||
|
|
||||||
|
# Easier to read.
|
||||||
|
if ups_stats.get('ups.load'):
|
||||||
|
ups_stats['load'] = ups_stats['ups.load']
|
||||||
|
del ups_stats['ups.load']
|
||||||
|
|
||||||
|
# Load the perfdata.
|
||||||
|
perf_data = {}
|
||||||
|
for k, v in ups_stats.items():
|
||||||
|
name = k.replace('.', '_')
|
||||||
|
perf_data[name] = {'value': try_float(v), 'warn': None, 'crit': None, 'min': 0}
|
||||||
|
|
||||||
|
# Set the perfdata values based on the levels.
|
||||||
|
if perf_data.get('battery_charge'):
|
||||||
|
perf_data['battery_charge']['warn'] = levels['battery.charge.warning']
|
||||||
|
perf_data['battery_charge']['crit'] = levels['battery.charge.low']
|
||||||
|
if perf_data.get('battery_runtime') and levels.get('battery.runtime.low'):
|
||||||
|
perf_data['battery_runtime']['warn'] = args.runtime_warn
|
||||||
|
perf_data['battery_runtime']['crit'] = levels.get('battery.runtime.low', 0)
|
||||||
|
|
||||||
|
# Set wiggle values
|
||||||
|
wiggle_warn = args.wiggle_crit * 0.01
|
||||||
|
wiggle_crit = args.wiggle_crit * 0.01
|
||||||
|
if perf_data.get('input_voltage') and levels.get('input.voltage.nominal'):
|
||||||
|
nominal = levels.get('input.voltage.nominal')
|
||||||
|
if nominal:
|
||||||
|
perf_data['input_voltage']['warn'] = calc_voltage_wiggle(wiggle_warn, nominal)
|
||||||
|
perf_data['input_voltage']['crit'] = calc_voltage_wiggle(wiggle_crit, nominal)
|
||||||
|
if perf_data.get('battery_voltage') and levels.get('battery.voltage.nominal'):
|
||||||
|
nominal = levels.get('battery.voltage.nominal')
|
||||||
|
if nominal:
|
||||||
|
perf_data['battery_voltage']['warn'] = calc_voltage_wiggle(wiggle_warn, nominal)
|
||||||
|
perf_data['battery_voltage']['crit'] = calc_voltage_wiggle(wiggle_crit, nominal)
|
||||||
|
output_nominal = levels.get('input.voltage.nominal')
|
||||||
|
if output_nominal:
|
||||||
|
perf_data['output_voltage']['warn'] = calc_voltage_wiggle(wiggle_warn, output_nominal)
|
||||||
|
perf_data['output_voltage']['crit'] = calc_voltage_wiggle(wiggle_crit, output_nominal)
|
||||||
|
|
||||||
|
# Set the perfdata values based on the input args.
|
||||||
|
perf_data['load']['warn'] = args.load_warn
|
||||||
|
perf_data['load']['crit'] = args.load_crit
|
||||||
|
|
||||||
|
# Determine our exit code based on the perfdata.
|
||||||
|
exit_code = nagios.STATE_OK
|
||||||
|
exit_msg = []
|
||||||
|
for metric, value in perf_data.items():
|
||||||
|
comparison = VALUE_COMPARISONS.get(metric, 'le')
|
||||||
|
state = get_state(value['value'], value['warn'], value['crit'], comparison)
|
||||||
|
if state != nagios.STATE_OK:
|
||||||
|
if metric == 'battery_runtime' and args.ignore_bad_runtime:
|
||||||
|
continue
|
||||||
|
v_pretty = value["value"]
|
||||||
|
if str(v_pretty).endswith('.0'):
|
||||||
|
v_pretty = int(value["value"])
|
||||||
|
name = ' '.join(metric.split('_'))
|
||||||
|
msg = f'{name} is {v_pretty}'
|
||||||
|
exit_msg.append(msg)
|
||||||
|
exit_code = max(exit_code, state)
|
||||||
|
|
||||||
|
# Determine our exit code based on the self-test results.
|
||||||
|
if not args.ignore_bad_test and ups_test_result.lower() != 'done and passed' and ups_test_result.lower() != 'no test initiated':
|
||||||
|
exit_code = nagios.STATE_CRIT
|
||||||
|
exit_msg.insert(0, f'test failed: "{ups_test_result}"')
|
||||||
|
|
||||||
|
# Determine our exit code based on the UPS status.
|
||||||
|
status_msg = []
|
||||||
|
for status in ups_status:
|
||||||
|
msg, code = parse_ups_status(status)
|
||||||
|
if code != nagios.STATE_OK:
|
||||||
|
status_msg.append(msg)
|
||||||
|
exit_code = max(exit_code, code)
|
||||||
|
status_text = ', '.join(status_msg)
|
||||||
|
|
||||||
|
if not len(status_msg):
|
||||||
|
runtime = round(perf_data["battery_runtime"]["value"] / 60, 1)
|
||||||
|
if str(runtime).endswith('.0'):
|
||||||
|
runtime = str(runtime).strip('.0')
|
||||||
|
status_text = f'{int(perf_data["battery_charge"]["value"])}% charge and has {runtime} minutes of runtime'
|
||||||
|
else:
|
||||||
|
status_text = f'status: {status_text}'
|
||||||
|
|
||||||
|
text_result = status_text + '. ' + ', '.join(exit_msg).capitalize()
|
||||||
|
print_icinga2_check_status(text_result, exit_code, perf_data)
|
||||||
|
sys.exit(exit_code)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
parser = argparse.ArgumentParser(description="Check NUT.")
|
||||||
|
parser.add_argument("--path", required=True, help="Path to the UPS to check. Example: `ups_name@localhost`")
|
||||||
|
parser.add_argument("--load-warn", default=75, type=int, help="Load warning level.")
|
||||||
|
parser.add_argument("--load-crit", default=85, type=int, help="Load critical level.")
|
||||||
|
parser.add_argument("--runtime-warn", default=None, type=int, help="Manually set the battery runtime warning level. Default: disabled")
|
||||||
|
parser.add_argument("--wiggle-warn", default=5, type=int, help="Value to use to determine warning level if a voltage changes this percentage from the nominal input.")
|
||||||
|
parser.add_argument("--wiggle-crit", default=10, type=int, help="Value to use to determine critical level if a voltage changes this percentage from the nominal input.")
|
||||||
|
parser.add_argument("--ignore-bad-runtime", action='store_true', help="Ignore the battery runtime value. Useful when you know the battery is getting old.")
|
||||||
|
parser.add_argument("--ignore-bad-test", action='store_true', help="Ignore bad test results.")
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
try:
|
||||||
|
main(args)
|
||||||
|
except Exception as e:
|
||||||
|
print_icinga2_check_status(f'exception "{e}" \n {traceback.format_exc()}', nagios.STATE_UNKNOWN)
|
||||||
|
sys.exit(nagios.STATE_UNKNOWN)
|
|
@ -0,0 +1,11 @@
|
||||||
|
def try_float(value: str) -> int | float | str:
|
||||||
|
try:
|
||||||
|
return float(value)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
return int(value)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
raise ValueError(f"Could not convert {value} to float or int")
|
||||||
|
# return value
|
Loading…
Reference in New Issue