2023-12-19 22:43:21 -07:00
#!/usr/bin/env python3
import argparse
import subprocess
import sys
import traceback
from pathlib import Path
from checker import print_icinga2_check_status , nagios
from checker . linuxfabric . base import get_state
from checker . types import try_float
UPSC_PATH = ' /usr/bin/upsc '
UNWANTED_STATS = [
' ups.mfr ' ,
' ups.productid ' ,
' device.serial ' ,
' ups.vendorid ' ,
' device.type ' ,
' battery.type ' ,
' ups.serial ' ,
' device.mfr ' ,
' battery.mfr.date '
]
UNWANTED_STATS_STARTSWITH = [
' ups.beeper. ' ,
' ups.delay. ' ,
' ups.timer. ' ,
]
STATS_LEVELS = [
' battery.charge.low ' ,
' battery.charge.warning ' ,
' input.transfer.high ' ,
' input.transfer.low ' ,
' input.voltage.nominal ' ,
' ups.realpower.nominal ' ,
' battery.runtime.low ' ,
' battery.voltage.nominal ' ,
]
VALUE_COMPARISONS = {
' load ' : ' ge ' ,
' battery_charge ' : ' le ' ,
' battery_runtime ' : ' le ' ,
' battery_voltage ' : ' le ' ,
' input_voltage ' : ' le ' ,
' output_voltage ' : ' le '
}
2023-12-19 22:50:29 -07:00
VALUE_UNITS = {
' load ' : ' % ' ,
2023-12-20 18:17:28 -07:00
' battery_charge ' : ' % ' ,
2023-12-19 22:50:29 -07:00
' battery_runtime ' : ' s ' ,
' battery_voltage ' : ' V ' ,
' input_voltage ' : ' V ' ,
' output_voltage ' : ' V '
}
2023-12-19 22:43:21 -07:00
def parse_ups_status ( status : str ) :
# https://networkupstools.org/docs/developer-guide.chunked/new-drivers.html#_status_data
status = status . lower ( )
if status == ' ol ' :
return ' on line ' , nagios . STATE_OK
elif status == ' ob ' :
return ' on battery ' , nagios . STATE_CRIT
elif status == ' lb ' :
return ' low battery ' , nagios . STATE_WARN
elif status == ' hb ' :
return ' high battery ' , nagios . STATE_CRIT
elif status == ' rb ' :
return ' replace battery ' , nagios . STATE_WARN
elif status == ' chrg ' :
return ' battery charging ' , nagios . STATE_OK
elif status == ' dischrg ' :
# inverter is providing load power
return ' battery discharging ' , nagios . STATE_WARN
elif status == ' bypass ' :
return ' battery bypass ' , nagios . STATE_WARN
elif status == ' cal ' :
return ' calibrating ' , nagios . STATE_WARN
elif status == ' off ' :
return ' off ' , nagios . STATE_CRIT
elif status == ' trim ' :
return ' trimming incoming voltage ' , nagios . STATE_WARN
elif status == ' boost ' :
return ' boosting incoming voltage ' , nagios . STATE_WARN
elif status == ' fsd ' :
return ' forced shutdown ' , nagios . STATE_CRIT
else :
return ' unknown ' , nagios . STATE_UNKNOWN
def calc_voltage_wiggle ( percent : float , nominal_voltage ) :
deviation = nominal_voltage * percent
return nominal_voltage - deviation
def parse_upsc ( output : str ) :
return [ y for y in output . split ( ' \n ' ) if not y . startswith ( ' Init SSL without certificate database ' ) and y != ' ' ]
def main ( args ) :
if not Path ( ' /usr/bin/upsc ' ) . is_file ( ) :
print_icinga2_check_status ( f ' Could not find " { UPSC_PATH } " . Make sure " nut-client " is installed. ' , nagios . STATE_UNKNOWN )
sys . exit ( nagios . STATE_UNKNOWN )
try :
cmd = subprocess . check_output ( f ' { UPSC_PATH } { args . path } > /dev/stdout 2> /dev/stdout ' , shell = True )
ups_stats = { x [ 0 ] : x [ 1 ] for x in [ y . split ( ' : ' ) for y in parse_upsc ( cmd . decode ( ) ) ] }
except Exception as e :
if isinstance ( e , subprocess . CalledProcessError ) :
lines = parse_upsc ( e . output . decode ( ) )
if len ( lines ) and ' data stale ' in lines [ 0 ] . lower ( ) :
print_icinga2_check_status ( f ' Failed to get UPS status: data stale ' , nagios . STATE_CRIT )
sys . exit ( nagios . STATE_CRIT )
2023-12-19 22:50:29 -07:00
else :
2024-01-14 13:06:47 -07:00
print_icinga2_check_status ( f ' Failed to get UPS status: { e } ' , nagios . STATE_UNKNOWN )
2023-12-19 22:43:21 -07:00
sys . exit ( nagios . STATE_UNKNOWN )
# Remove unwanted stats.
for k , v in ups_stats . copy ( ) . items ( ) :
if k . startswith ( ' driver. ' ) :
del ups_stats [ k ]
for x in UNWANTED_STATS :
if ups_stats . get ( x ) :
del ups_stats [ x ]
for x in UNWANTED_STATS_STARTSWITH :
for k , v in ups_stats . copy ( ) . items ( ) :
if k . startswith ( x ) :
del ups_stats [ k ]
# Grab the levels from the stats.
levels = { }
for level in STATS_LEVELS :
for k , v in ups_stats . copy ( ) . items ( ) :
if k == level :
levels [ k ] = try_float ( v )
del ups_stats [ k ]
# Grab the test results.
ups_test_result = None
if ups_stats . get ( ' ups.test.result ' ) :
ups_test_result = ups_stats [ ' ups.test.result ' ]
del ups_stats [ ' ups.test.result ' ]
# Grab the UPS status.
ups_status = ups_stats [ ' ups.status ' ]
del ups_stats [ ' ups.status ' ]
ups_status = ups_status . split ( ' ' )
# Grab the UPS model
ups_model = None
if ups_stats . get ( ' device.model ' ) :
ups_model = ups_stats [ ' device.model ' ]
del ups_stats [ ' device.model ' ]
if ups_stats . get ( ' ups.model ' ) :
ups_model = ups_stats [ ' ups.model ' ]
del ups_stats [ ' ups.model ' ]
# Easier to read.
if ups_stats . get ( ' ups.load ' ) :
ups_stats [ ' load ' ] = ups_stats [ ' ups.load ' ]
del ups_stats [ ' ups.load ' ]
# Load the perfdata.
perf_data = { }
for k , v in ups_stats . items ( ) :
name = k . replace ( ' . ' , ' _ ' )
2023-12-19 22:50:29 -07:00
perf_data [ name ] = { ' value ' : try_float ( v ) , ' warn ' : None , ' crit ' : None , ' min ' : 0 , ' unit ' : VALUE_UNITS . get ( name ) }
2023-12-19 22:43:21 -07:00
# Set the perfdata values based on the levels.
if perf_data . get ( ' battery_charge ' ) :
perf_data [ ' battery_charge ' ] [ ' warn ' ] = levels [ ' battery.charge.warning ' ]
perf_data [ ' battery_charge ' ] [ ' crit ' ] = levels [ ' battery.charge.low ' ]
if perf_data . get ( ' battery_runtime ' ) and levels . get ( ' battery.runtime.low ' ) :
perf_data [ ' battery_runtime ' ] [ ' warn ' ] = args . runtime_warn
perf_data [ ' battery_runtime ' ] [ ' crit ' ] = levels . get ( ' battery.runtime.low ' , 0 )
# Set wiggle values
wiggle_warn = args . wiggle_crit * 0.01
wiggle_crit = args . wiggle_crit * 0.01
if perf_data . get ( ' input_voltage ' ) and levels . get ( ' input.voltage.nominal ' ) :
nominal = levels . get ( ' input.voltage.nominal ' )
if nominal :
perf_data [ ' input_voltage ' ] [ ' warn ' ] = calc_voltage_wiggle ( wiggle_warn , nominal )
perf_data [ ' input_voltage ' ] [ ' crit ' ] = calc_voltage_wiggle ( wiggle_crit , nominal )
if perf_data . get ( ' battery_voltage ' ) and levels . get ( ' battery.voltage.nominal ' ) :
nominal = levels . get ( ' battery.voltage.nominal ' )
if nominal :
perf_data [ ' battery_voltage ' ] [ ' warn ' ] = calc_voltage_wiggle ( wiggle_warn , nominal )
perf_data [ ' battery_voltage ' ] [ ' crit ' ] = calc_voltage_wiggle ( wiggle_crit , nominal )
output_nominal = levels . get ( ' input.voltage.nominal ' )
if output_nominal :
perf_data [ ' output_voltage ' ] [ ' warn ' ] = calc_voltage_wiggle ( wiggle_warn , output_nominal )
perf_data [ ' output_voltage ' ] [ ' crit ' ] = calc_voltage_wiggle ( wiggle_crit , output_nominal )
# Set the perfdata values based on the input args.
perf_data [ ' load ' ] [ ' warn ' ] = args . load_warn
perf_data [ ' load ' ] [ ' crit ' ] = args . load_crit
# Determine our exit code based on the perfdata.
exit_code = nagios . STATE_OK
exit_msg = [ ]
for metric , value in perf_data . items ( ) :
comparison = VALUE_COMPARISONS . get ( metric , ' le ' )
state = get_state ( value [ ' value ' ] , value [ ' warn ' ] , value [ ' crit ' ] , comparison )
if state != nagios . STATE_OK :
if metric == ' battery_runtime ' and args . ignore_bad_runtime :
continue
v_pretty = value [ " value " ]
if str ( v_pretty ) . endswith ( ' .0 ' ) :
v_pretty = int ( value [ " value " ] )
name = ' ' . join ( metric . split ( ' _ ' ) )
msg = f ' { name } is { v_pretty } '
exit_msg . append ( msg )
exit_code = max ( exit_code , state )
# Determine our exit code based on the self-test results.
if not args . ignore_bad_test and ups_test_result . lower ( ) != ' done and passed ' and ups_test_result . lower ( ) != ' no test initiated ' :
exit_code = nagios . STATE_CRIT
exit_msg . insert ( 0 , f ' test failed: " { ups_test_result } " ' )
# Determine our exit code based on the UPS status.
status_msg = [ ]
for status in ups_status :
msg , code = parse_ups_status ( status )
if code != nagios . STATE_OK :
status_msg . append ( msg )
exit_code = max ( exit_code , code )
status_text = ' , ' . join ( status_msg )
if not len ( status_msg ) :
runtime = round ( perf_data [ " battery_runtime " ] [ " value " ] / 60 , 1 )
if str ( runtime ) . endswith ( ' .0 ' ) :
runtime = str ( runtime ) . strip ( ' .0 ' )
status_text = f ' { int ( perf_data [ " battery_charge " ] [ " value " ] ) } % charge and has { runtime } minutes of runtime '
else :
status_text = f ' status: { status_text } '
text_result = status_text + ' . ' + ' , ' . join ( exit_msg ) . capitalize ( )
print_icinga2_check_status ( text_result , exit_code , perf_data )
sys . exit ( exit_code )
if __name__ == " __main__ " :
parser = argparse . ArgumentParser ( description = " Check NUT. " )
parser . add_argument ( " --path " , required = True , help = " Path to the UPS to check. Example: `ups_name@localhost` " )
parser . add_argument ( " --load-warn " , default = 75 , type = int , help = " Load warning level. " )
parser . add_argument ( " --load-crit " , default = 85 , type = int , help = " Load critical level. " )
parser . add_argument ( " --runtime-warn " , default = None , type = int , help = " Manually set the battery runtime warning level. Default: disabled " )
parser . add_argument ( " --wiggle-warn " , default = 5 , type = int , help = " Value to use to determine warning level if a voltage changes this percentage from the nominal input. " )
parser . add_argument ( " --wiggle-crit " , default = 10 , type = int , help = " Value to use to determine critical level if a voltage changes this percentage from the nominal input. " )
parser . add_argument ( " --ignore-bad-runtime " , action = ' store_true ' , help = " Ignore the battery runtime value. Useful when you know the battery is getting old. " )
parser . add_argument ( " --ignore-bad-test " , action = ' store_true ' , help = " Ignore bad test results. " )
args = parser . parse_args ( )
try :
main ( args )
except Exception as e :
print_icinga2_check_status ( f ' exception " { e } " \n { traceback . format_exc ( ) } ' , nagios . STATE_UNKNOWN )
sys . exit ( nagios . STATE_UNKNOWN )