icinga2-checks/check_synology.py

245 lines
8.9 KiB
Python
Executable File

import argparse
import sys
import traceback
from enum import Enum
from pysnmp.hlapi import *
from checker import nagios
from checker.result import quit_check
"""
https://github.com/SnejPro/check_synology/blob/main/check_synology.py
"""
class SnmpMem(Enum):
unused = '1.3.6.1.4.1.2021.4.6.0'
total = '1.3.6.1.4.1.2021.4.5.0'
cached = '1.3.6.1.4.1.2021.4.15.0'
class SnmpLoad(Enum):
min1 = '1.3.6.1.4.1.2021.10.1.3.1'
min5 = '1.3.6.1.4.1.2021.10.1.3.2'
min15 = '1.3.6.1.4.1.2021.10.1.3.3'
core_stats = '1.3.6.1.2.1.25.3.3.1.2'
class SnmpStatus(Enum):
model = '1.3.6.1.4.1.6574.1.5.1.0'
serial = '1.3.6.1.4.1.6574.1.5.2.0'
temperature = '1.3.6.1.4.1.6574.1.2.0'
system = '1.3.6.1.4.1.6574.1.1.0'
system_fan = '1.3.6.1.4.1.6574.1.4.1.0'
cpu_fan = '1.3.6.1.4.1.6574.1.4.2.0'
power = '1.3.6.1.4.1.6574.1.3.0'
class SnmpDisk(Enum):
disk = '1.3.6.1.4.1.6574.2.1.1'
def check_failed(value: str):
if value == '1':
exit_code = nagios.STATE_OK
output = 'Normal'
elif value == '2':
exit_code = nagios.STATE_CRIT
output = 'Failed'
else:
exit_code = nagios.STATE_UNKNOWN
output = 'Unknown status replied'
return output.lower(), int(value), exit_code
def query_snmp(host, community, oid: str):
error_indication, error_status, error_index, var_binds = next(
getCmd(SnmpEngine(),
CommunityData(community),
UdpTransportTarget((host, 161)),
ContextData(),
ObjectType(ObjectIdentity(oid)))
)
if error_indication:
raise Exception(error_indication)
elif error_status:
Exception('%s at %s' % (error_status.prettyPrint(),
error_index and var_binds[int(error_index) - 1][0] or '?'))
else:
for varBind in var_binds:
return varBind[1].prettyPrint()
def walk_snmp(host, community, oid):
result = {}
for (error_indication, error_status, error_index, var_binds) in nextCmd(
SnmpEngine(),
CommunityData(community),
UdpTransportTarget((host, 161)),
ContextData(),
ObjectType(ObjectIdentity(oid)),
lexicographicMode=False):
if error_indication:
raise Exception(error_indication)
elif error_status:
Exception('%s at %s' % (error_status.prettyPrint(),
error_index and var_binds[int(error_index) - 1][0] or '?'))
else:
for varBind in var_binds:
result[str(varBind[0])] = varBind[1].prettyPrint()
return result
def main(args):
if args.choice == 'mem':
mem_unused = query_snmp(args.host, args.community, SnmpMem.unused.value)
mem_total = query_snmp(args.host, args.community, SnmpMem.total.value)
mem_total_mb = int(mem_total) // 1000
mem_cached = query_snmp(args.host, args.community, SnmpMem.cached.value)
mem_used_mb = (int(mem_total) - int(mem_unused) - int(mem_cached)) // 1000
perfdata_dict = {
'memory_total': {
'value': mem_total_mb,
'unit': 'mb',
'min': 0
},
'memory_used': {
'value': mem_used_mb,
'unit': 'mb',
'min': 0
}
}
mem_used_percent = (mem_used_mb / mem_total_mb) * 100
if mem_used_percent >= args.crit_mem:
exit_code = nagios.STATE_CRIT
elif mem_used_percent >= args.warn_mem:
exit_code = nagios.STATE_WARN
else:
exit_code = nagios.STATE_OK
quit_check(f'Memory usage is {mem_used_mb}/{mem_total_mb} MB ({int(mem_used_percent)}%)', exit_code, perfdata_dict)
elif args.choice == 'load':
core_stats = query_snmp(args.host, args.community, SnmpLoad.core_stats.value)
core_number = len(core_stats.split('\n'))
load_1min = float(query_snmp(args.host, args.community, SnmpLoad.min1.value))
load_5min = float(query_snmp(args.host, args.community, SnmpLoad.min5.value))
load_15min = float(query_snmp(args.host, args.community, SnmpLoad.min15.value))
perfdata_dict = {
'load_1min': {
'value': load_1min,
'unit': '',
'min': 0
},
'load_5min': {
'value': load_5min,
'unit': '',
'min': 0
},
'load_15min': {
'value': load_15min,
'unit': '',
'min': 0
}
}
if load_1min >= core_number * 4 or load_5min >= core_number * 2 or load_15min >= core_number:
exit_code = nagios.STATE_CRIT
elif load_1min >= core_number * 2 or load_5min >= core_number * 1.5 or load_15min >= core_number - 0.3:
exit_code = nagios.STATE_WARN
else:
exit_code = nagios.STATE_OK
quit_check(f'Load average: {load_1min}, {load_5min}, {load_15min}', exit_code, perfdata_dict)
elif args.choice == 'status':
# status_model = query_snmp(args.host, args.community, SnmpStatus.model.value)
# status_serial = query_snmp(args.host, args.community, SnmpStatus.serial.value)
status_temp = query_snmp(args.host, args.community, SnmpStatus.temperature.value)
status_system_output, status_system_value, status_system_exit_code = check_failed(query_snmp(args.host, args.community, SnmpStatus.system.value))
status_fan_output, status_fan_value, status_fan_exit_code = check_failed(query_snmp(args.host, args.community, SnmpStatus.system_fan.value))
status_fan_cpu_output, status_fan_cpu_value, status_fan_cpu_exit_code = check_failed(query_snmp(args.host, args.community, SnmpStatus.cpu_fan.value))
status_power_output, status_power_value, status_power_exit_code = check_failed(query_snmp(args.host, args.community, SnmpStatus.power.value))
exit_code = max(status_system_exit_code, status_fan_exit_code, status_fan_cpu_exit_code, status_power_exit_code)
quit_check(f'Temp: {status_temp}°C. System: {status_system_output}. Fan: {status_fan_output}. CPU Fan: {status_fan_cpu_output}. Power: {status_power_output}', exit_code, {
'status_system': {
'value': status_system_value,
'unit': '',
'min': 0
},
'status_fan': {
'value': status_fan_value,
'unit': '',
'min': 0
},
'status_fan_cpu': {
'value': status_fan_cpu_value,
'unit': '',
'min': 0
},
'status_power': {
'value': status_power_value,
'unit': '',
'min': 0
},
'temperature': {
'value': status_temp,
'unit': 'C'
}
})
elif args.choice == 'disk':
data = walk_snmp(args.host, args.community, SnmpDisk.disk.value)
disk_indices = set(key.split('.')[-1] for key in data.keys())
disk_data = []
for index in disk_indices:
disk_info = []
for key, value in data.items():
if key.endswith(index):
disk_info.append(value)
disk_data.append(disk_info)
result_str = ''
perfdata_dict = {}
exit_code = nagios.STATE_OK
for disk, data in enumerate(disk_data):
output, value, d_exit_code = check_failed(data[4])
exit_code = d_exit_code
result_str = result_str + f'Disk {disk + 1}: {output}'
perfdata_dict[f'disk_{disk + 1}_status'] = {
'value': value,
'unit': '',
'min': 0
}
perfdata_dict[f'disk_{disk + 1}_temperature'] = {
'value': data[5],
'unit': 'C'
}
quit_check(result_str, exit_code, perfdata_dict)
elif args.choice == 'storage':
raise NotImplementedError
elif args.choice == 'network':
raise NotImplementedError
else:
raise Exception
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-H', '--host', required=True, help='The host to connect to.')
parser.add_argument('-c', '--community', required=True, help='The community name.')
parser.add_argument('-C', '--choice', required=True, choices=['mem', 'load', 'status', 'disk', 'storage', 'network'], help='What to check.')
parser.add_argument('--warn-mem', type=int, default=75, help='Memory usage percent to warn at. Default: 75%')
parser.add_argument('--crit-mem', type=int, default=90, help='Memory usage percent to crit at. Default: 90%')
args = parser.parse_args()
try:
main(args)
except Exception as e:
print(f'UNKNOWN - exception "{e}"')
traceback.print_exc()
sys.exit(nagios.STATE_UNKNOWN)