import argparse import sys import traceback from enum import Enum from pysnmp.hlapi import * from checker import nagios from checker.result import quit_check """ https://github.com/SnejPro/check_synology/blob/main/check_synology.py """ class SnmpMem(Enum): unused = '1.3.6.1.4.1.2021.4.6.0' total = '1.3.6.1.4.1.2021.4.5.0' cached = '1.3.6.1.4.1.2021.4.15.0' class SnmpLoad(Enum): min1 = '1.3.6.1.4.1.2021.10.1.3.1' min5 = '1.3.6.1.4.1.2021.10.1.3.2' min15 = '1.3.6.1.4.1.2021.10.1.3.3' core_stats = '1.3.6.1.2.1.25.3.3.1.2' class SnmpStatus(Enum): model = '1.3.6.1.4.1.6574.1.5.1.0' serial = '1.3.6.1.4.1.6574.1.5.2.0' temperature = '1.3.6.1.4.1.6574.1.2.0' system = '1.3.6.1.4.1.6574.1.1.0' system_fan = '1.3.6.1.4.1.6574.1.4.1.0' cpu_fan = '1.3.6.1.4.1.6574.1.4.2.0' power = '1.3.6.1.4.1.6574.1.3.0' class SnmpDisk(Enum): disk = '1.3.6.1.4.1.6574.2.1.1' def check_failed(value: str): if value == '1': exit_code = nagios.STATE_OK output = 'Normal' elif value == '2': exit_code = nagios.STATE_CRIT output = 'Failed' else: exit_code = nagios.STATE_UNKNOWN output = 'Unknown status replied' return output.lower(), int(value), exit_code def check_disk_status(value: str): if value == '1': exit_code = nagios.STATE_OK output = 'Normal' elif value == '2': exit_code = nagios.STATE_WARN output = 'Initialized' elif value == '3': exit_code = nagios.STATE_WARN output = 'Not Initialized' elif value == '4': exit_code = nagios.STATE_CRIT output = 'System Partition Failed' elif value == '5': exit_code = nagios.STATE_CRIT output = 'Crashed' else: exit_code = nagios.STATE_UNKNOWN output = 'Unknown status replied' return output.lower(), int(value), exit_code def query_snmp(host, community, oid: str): error_indication, error_status, error_index, var_binds = next( getCmd(SnmpEngine(), CommunityData(community), UdpTransportTarget((host, 161)), ContextData(), ObjectType(ObjectIdentity(oid))) ) if error_indication: raise Exception(error_indication) elif error_status: Exception('%s at %s' % (error_status.prettyPrint(), error_index and var_binds[int(error_index) - 1][0] or '?')) else: for varBind in var_binds: return varBind[1].prettyPrint() def walk_snmp(host, community, oid): result = {} for (error_indication, error_status, error_index, var_binds) in nextCmd( SnmpEngine(), CommunityData(community), UdpTransportTarget((host, 161)), ContextData(), ObjectType(ObjectIdentity(oid)), lexicographicMode=False): if error_indication: raise Exception(error_indication) elif error_status: Exception('%s at %s' % (error_status.prettyPrint(), error_index and var_binds[int(error_index) - 1][0] or '?')) else: for varBind in var_binds: result[str(varBind[0])] = varBind[1].prettyPrint() return result def main(args): if args.choice == 'mem': mem_unused = query_snmp(args.host, args.community, SnmpMem.unused.value) mem_total = query_snmp(args.host, args.community, SnmpMem.total.value) mem_total_mb = int(mem_total) // 1000 mem_cached = query_snmp(args.host, args.community, SnmpMem.cached.value) mem_used_mb = (int(mem_total) - int(mem_unused) - int(mem_cached)) // 1000 perfdata_dict = { 'memory_total': { 'value': mem_total_mb, 'unit': 'mb', 'min': 0 }, 'memory_used': { 'value': mem_used_mb, 'unit': 'mb', 'min': 0 } } mem_used_percent = (mem_used_mb / mem_total_mb) * 100 if mem_used_percent >= args.crit_mem: exit_code = nagios.STATE_CRIT elif mem_used_percent >= args.warn_mem: exit_code = nagios.STATE_WARN else: exit_code = nagios.STATE_OK quit_check(f'Memory usage is {mem_used_mb}/{mem_total_mb} MB ({int(mem_used_percent)}%)', exit_code, perfdata_dict) elif args.choice == 'load': core_stats = query_snmp(args.host, args.community, SnmpLoad.core_stats.value) core_number = len(core_stats.split('\n')) load_1min = float(query_snmp(args.host, args.community, SnmpLoad.min1.value)) load_5min = float(query_snmp(args.host, args.community, SnmpLoad.min5.value)) load_15min = float(query_snmp(args.host, args.community, SnmpLoad.min15.value)) perfdata_dict = { 'load_1min': { 'value': load_1min, 'unit': '', 'min': 0 }, 'load_5min': { 'value': load_5min, 'unit': '', 'min': 0 }, 'load_15min': { 'value': load_15min, 'unit': '', 'min': 0 } } if load_1min >= core_number * 4 or load_5min >= core_number * 2 or load_15min >= core_number: exit_code = nagios.STATE_CRIT elif load_1min >= core_number * 2 or load_5min >= core_number * 1.5 or load_15min >= core_number - 0.3: exit_code = nagios.STATE_WARN else: exit_code = nagios.STATE_OK quit_check(f'Load average: {load_1min}, {load_5min}, {load_15min}', exit_code, perfdata_dict) elif args.choice == 'status': # status_model = query_snmp(args.host, args.community, SnmpStatus.model.value) # status_serial = query_snmp(args.host, args.community, SnmpStatus.serial.value) status_temp = query_snmp(args.host, args.community, SnmpStatus.temperature.value) status_system_output, status_system_value, status_system_exit_code = check_failed(query_snmp(args.host, args.community, SnmpStatus.system.value)) status_fan_output, status_fan_value, status_fan_exit_code = check_failed(query_snmp(args.host, args.community, SnmpStatus.system_fan.value)) status_fan_cpu_output, status_fan_cpu_value, status_fan_cpu_exit_code = check_failed(query_snmp(args.host, args.community, SnmpStatus.cpu_fan.value)) status_power_output, status_power_value, status_power_exit_code = check_failed(query_snmp(args.host, args.community, SnmpStatus.power.value)) exit_code = max(status_system_exit_code, status_fan_exit_code, status_fan_cpu_exit_code, status_power_exit_code) quit_check(f'Temp: {status_temp}°C. System: {status_system_output}. Fan: {status_fan_output}. CPU Fan: {status_fan_cpu_output}. Power: {status_power_output}', exit_code, { 'status_system': { 'value': status_system_value, 'unit': '', 'min': 0 }, 'status_fan': { 'value': status_fan_value, 'unit': '', 'min': 0 }, 'status_fan_cpu': { 'value': status_fan_cpu_value, 'unit': '', 'min': 0 }, 'status_power': { 'value': status_power_value, 'unit': '', 'min': 0 }, 'temperature': { 'value': status_temp, 'unit': 'C' } }) elif args.choice == 'disks': data = walk_snmp(args.host, args.community, SnmpDisk.disk.value) disk_indices = set(key.split('.')[-1] for key in data.keys()) disk_data = [] for index in disk_indices: disk_info = [] for key, value in data.items(): if key.endswith(index): disk_info.append(value) disk_data.append(disk_info) result_str = '' perfdata_dict = {} exit_code = nagios.STATE_OK for disk, data in enumerate(disk_data): output, value, d_exit_code = check_disk_status(data[4]) exit_code = d_exit_code result_str = result_str + f'Disk {disk + 1}: {output}' perfdata_dict[f'disk_{disk + 1}_status'] = { 'value': value, 'unit': '', 'min': 0 } perfdata_dict[f'disk_{disk + 1}_temperature'] = { 'value': data[5], 'unit': 'C' } quit_check(result_str, exit_code, perfdata_dict) elif args.choice == 'storage': raise NotImplementedError elif args.choice == 'network': raise NotImplementedError else: raise Exception if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('-H', '--host', required=True, help='The host to connect to.') parser.add_argument('-c', '--community', required=True, help='The community name.') parser.add_argument('-C', '--choice', required=True, choices=['mem', 'load', 'status', 'disks', 'storage', 'network'], help='What to check.') parser.add_argument('--warn-mem', type=int, default=75, help='Memory usage percent to warn at. Default: 75%') parser.add_argument('--crit-mem', type=int, default=90, help='Memory usage percent to crit at. Default: 90%') args = parser.parse_args() try: main(args) except Exception as e: print(f'UNKNOWN - exception "{e}"') traceback.print_exc() sys.exit(nagios.STATE_UNKNOWN)