import argparse import sys import traceback from enum import Enum # from pycparser.c_ast import Union as PycUnion from pysnmp.hlapi import * from checker import nagios from checker.result import quit_check from checker.units import human_readable_size """ https://github.com/SnejPro/check_synology/blob/main/check_synology.py """ class SnmpMem(Enum): unused = '1.3.6.1.4.1.2021.4.6.0' total = '1.3.6.1.4.1.2021.4.5.0' cached = '1.3.6.1.4.1.2021.4.15.0' class SnmpLoad(Enum): min1 = '1.3.6.1.4.1.2021.10.1.3.1' min5 = '1.3.6.1.4.1.2021.10.1.3.2' min15 = '1.3.6.1.4.1.2021.10.1.3.3' core_stats = '1.3.6.1.2.1.25.3.3.1.2' class SnmpStatus(Enum): model = '1.3.6.1.4.1.6574.1.5.1.0' serial = '1.3.6.1.4.1.6574.1.5.2.0' temperature = '1.3.6.1.4.1.6574.1.2.0' system = '1.3.6.1.4.1.6574.1.1.0' system_fan = '1.3.6.1.4.1.6574.1.4.1.0' cpu_fan = '1.3.6.1.4.1.6574.1.4.2.0' power = '1.3.6.1.4.1.6574.1.3.0' class SnmpDisk(Enum): disk = '1.3.6.1.4.1.6574.2.1.1' class SnmpStorage(Enum): desc = '1.3.6.1.2.1.25.2.3.1.3' allocated_pre = '1.3.6.1.2.1.25.2.3.1.4.' size_pre = '1.3.6.1.2.1.25.2.3.1.5.' used_pre = '1.3.6.1.2.1.25.2.3.1.6.' def format_bytes(size: int, unit): power = 10 ** 3 n = 0 power_labels = {0: '', 1: 'K', 2: 'M', 3: 'G', 4: 'T'} while size > power: size /= power n += 1 return size, power_labels[n] + unit, str(round(size, 2)) + ' ' + power_labels[n] + unit def check_failed(value: str): if value == '1': exit_code = nagios.STATE_OK output = 'Normal' elif value == '2': exit_code = nagios.STATE_CRIT output = 'Failed' else: exit_code = nagios.STATE_UNKNOWN output = 'Unknown status replied' return output.lower(), int(value), exit_code def check_disk_status(value: str): if value == '1': exit_code = nagios.STATE_OK output = 'Normal' elif value == '2': exit_code = nagios.STATE_WARN output = 'Initialized' elif value == '3': exit_code = nagios.STATE_WARN output = 'Not Initialized' elif value == '4': exit_code = nagios.STATE_CRIT output = 'System Partition Failed' elif value == '5': exit_code = nagios.STATE_CRIT output = 'Crashed' else: exit_code = nagios.STATE_UNKNOWN output = 'Unknown status replied' return output.lower(), int(value), exit_code def query_snmp(host, community, oid: str): error_indication, error_status, error_index, var_binds = next( getCmd(SnmpEngine(), CommunityData(community), UdpTransportTarget((host, 161)), ContextData(), ObjectType(ObjectIdentity(oid))) ) if error_indication: raise Exception(error_indication) elif error_status: Exception('%s at %s' % (error_status.prettyPrint(), error_index and var_binds[int(error_index) - 1][0] or '?')) else: for varBind in var_binds: return varBind[1].prettyPrint() def walk_snmp(host, community, oid): result = {} for (error_indication, error_status, error_index, var_binds) in nextCmd( SnmpEngine(), CommunityData(community), UdpTransportTarget((host, 161)), ContextData(), ObjectType(ObjectIdentity(oid)), lexicographicMode=False): if error_indication: raise Exception(error_indication) elif error_status: Exception('%s at %s' % (error_status.prettyPrint(), error_index and var_binds[int(error_index) - 1][0] or '?')) else: for varBind in var_binds: result[str(varBind[0])] = varBind[1].prettyPrint() return result def parse_walk(data: dict): disk_indices = set(key.split('.')[-1] for key in data.keys()) parsed = [] for index in disk_indices: disk_info = [] for key, value in data.items(): if key.endswith(index): disk_info.append(value) parsed.append(disk_info) return parsed def main(args): if args.choice == 'mem': mem_unused = query_snmp(args.host, args.community, SnmpMem.unused.value) mem_total = query_snmp(args.host, args.community, SnmpMem.total.value) mem_total_mb = int(mem_total) // 1000 mem_cached = query_snmp(args.host, args.community, SnmpMem.cached.value) mem_used_mb = (int(mem_total) - int(mem_unused) - int(mem_cached)) // 1000 perfdata_dict = { 'memory_total': { 'value': mem_total_mb, 'unit': 'mb', 'min': 0 }, 'memory_used': { 'value': mem_used_mb, 'unit': 'mb', 'min': 0 } } mem_used_percent = (mem_used_mb / mem_total_mb) * 100 if mem_used_percent >= args.crit_mem: exit_code = nagios.STATE_CRIT elif mem_used_percent >= args.warn_mem: exit_code = nagios.STATE_WARN else: exit_code = nagios.STATE_OK quit_check(f'Memory usage is {mem_used_mb}/{mem_total_mb} MB ({int(mem_used_percent)}%)', exit_code, perfdata_dict) elif args.choice == 'load': core_stats = query_snmp(args.host, args.community, SnmpLoad.core_stats.value) core_number = len(core_stats.split('\n')) load_1min = float(query_snmp(args.host, args.community, SnmpLoad.min1.value)) load_5min = float(query_snmp(args.host, args.community, SnmpLoad.min5.value)) load_15min = float(query_snmp(args.host, args.community, SnmpLoad.min15.value)) perfdata_dict = { 'load_1min': { 'value': load_1min, 'unit': '', 'min': 0 }, 'load_5min': { 'value': load_5min, 'unit': '', 'min': 0 }, 'load_15min': { 'value': load_15min, 'unit': '', 'min': 0 } } if load_1min >= core_number * 4 or load_5min >= core_number * 2 or load_15min >= core_number: exit_code = nagios.STATE_CRIT elif load_1min >= core_number * 2 or load_5min >= core_number * 1.5 or load_15min >= core_number - 0.3: exit_code = nagios.STATE_WARN else: exit_code = nagios.STATE_OK quit_check(f'Load average: {load_1min}, {load_5min}, {load_15min}', exit_code, perfdata_dict) elif args.choice == 'status': # status_model = query_snmp(args.host, args.community, SnmpStatus.model.value) # status_serial = query_snmp(args.host, args.community, SnmpStatus.serial.value) status_temp = query_snmp(args.host, args.community, SnmpStatus.temperature.value) status_system_output, status_system_value, status_system_exit_code = check_failed(query_snmp(args.host, args.community, SnmpStatus.system.value)) status_fan_output, status_fan_value, status_fan_exit_code = check_failed(query_snmp(args.host, args.community, SnmpStatus.system_fan.value)) status_fan_cpu_output, status_fan_cpu_value, status_fan_cpu_exit_code = check_failed(query_snmp(args.host, args.community, SnmpStatus.cpu_fan.value)) status_power_output, status_power_value, status_power_exit_code = check_failed(query_snmp(args.host, args.community, SnmpStatus.power.value)) exit_code = max(status_system_exit_code, status_fan_exit_code, status_fan_cpu_exit_code, status_power_exit_code) quit_check(f'Temp: {status_temp}°C. System: {status_system_output}. Fan: {status_fan_output}. CPU Fan: {status_fan_cpu_output}. Power: {status_power_output}', exit_code, { 'status_system': { 'value': status_system_value, 'unit': '', 'min': 0 }, 'status_fan': { 'value': status_fan_value, 'unit': '', 'min': 0 }, 'status_fan_cpu': { 'value': status_fan_cpu_value, 'unit': '', 'min': 0 }, 'status_power': { 'value': status_power_value, 'unit': '', 'min': 0 }, 'temperature': { 'value': status_temp, 'unit': 'C' } }) elif args.choice == 'disks': data = walk_snmp(args.host, args.community, SnmpDisk.disk.value) disk_data = parse_walk(data) result_str = '' perfdata_dict = {} exit_code = nagios.STATE_OK for disk, data in enumerate(disk_data): output, value, d_exit_code = check_disk_status(data[4]) exit_code = d_exit_code result_str = result_str + f'Disk {disk + 1}: {output}. ' perfdata_dict[f'disk_{disk + 1}_status'] = { 'value': value, 'unit': '', 'min': 0 } perfdata_dict[f'disk_{disk + 1}_temperature'] = { 'value': data[5], 'unit': 'C' } quit_check(result_str.strip(' '), exit_code, perfdata_dict) elif args.choice == 'storage': store_list = walk_snmp(args.host, args.community, SnmpStorage.desc.value) result_str = '' perfdata_dict = {} exit_codes = [] for k, name in store_list.items(): if not name.startswith('/volume'): continue store_id = k.split('.')[-1] allocated = int(query_snmp(args.host, args.community, SnmpStorage.allocated_pre.value + store_id)) size = int(query_snmp(args.host, args.community, SnmpStorage.size_pre.value + store_id)) * allocated used = int(query_snmp(args.host, args.community, SnmpStorage.used_pre.value + store_id)) * allocated used_percent = round((used / size) * 100, 1) result_str = result_str + f'{name}: {human_readable_size(used, decimal_places=0)}/{human_readable_size(size, decimal_places=0)} ({used_percent}%). ' perf_name = name.replace('/', '') perfdata_dict[f'{perf_name}_size'] = { 'value': size, 'unit': 'B', 'min': 0 } perfdata_dict[f'{perf_name}_used'] = { 'value': used, 'unit': 'B', 'min': 0 } perfdata_dict[f'{perf_name}_used_percent'] = { 'value': used_percent, 'unit': '%', 'min': 0 } if used_percent >= float(args.crit_store): exit_codes.append(nagios.STATE_CRIT) elif used_percent >= float(args.warn_store): exit_codes.append(nagios.STATE_WARN) else: exit_codes.append(nagios.STATE_OK) quit_check(result_str.strip(' '), max(exit_codes), perfdata_dict) elif args.choice == 'network': raise NotImplementedError else: raise Exception def validate_int_float_arg(arg, name): try: arg = float(arg) except: raise Exception(f'{name} must be an int or float') if not isinstance(arg, (int, float)): raise Exception(f'{name} must be an int or float') if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('-H', '--host', required=True, help='The host to connect to.') parser.add_argument('-c', '--community', required=True, help='The community name.') parser.add_argument('-C', '--choice', required=True, choices=['mem', 'load', 'status', 'disks', 'storage', 'network'], help='What to check.') parser.add_argument('--warn-mem', type=int, default=75, help='Memory usage percent to warn at. Default: 75%') parser.add_argument('--crit-mem', type=int, default=90, help='Memory usage percent to crit at. Default: 90%') parser.add_argument('--warn-store', default=75, help='Storage usage percent to warn at. Default: 65%') parser.add_argument('--crit-store', default=90, help='Storage usage percent to crit at. Default: 70%') args = parser.parse_args() validate_int_float_arg(args.warn_store, '--warn-store') validate_int_float_arg(args.crit_store, '--crit-store') try: main(args) except Exception as e: print(f'UNKNOWN - exception "{e}"') traceback.print_exc() sys.exit(nagios.STATE_UNKNOWN)