#!/usr/bin/env python3 import argparse import subprocess import sys from datetime import datetime, timedelta from typing import List import dateparser import pytz from checker import nagios from checker.http import fetch_with_retry def get_disk_wwn_ids(ignore_non_smart: bool = False) -> List[str] or bool: wwn_ids = [] try: output = subprocess.check_output(["sudo", "smartctl", "--scan"]) for line in output.decode("utf-8").strip().split("\n"): parts = line.split() if len(parts) >= 3: device = parts[0] device_type = parts[2].replace('scsi', 'sat,auto') try: smart_supported = subprocess.check_output( ["sudo", "smartctl", "-i", device, "-d", device_type]).decode("utf-8") if "SMART support is: Enabled" in smart_supported: wwn_line = [line for line in smart_supported.split("\n") if "LU WWN Device Id" in line] wwn_id = '0x' + wwn_line[0].replace('LU WWN Device Id: ', '').replace(' ', '') wwn_ids.append(wwn_id) # else: # # TODO: warn if a drive doesn't support SMART except subprocess.CalledProcessError as e: if ignore_non_smart: continue else: print(f"UNKNOWN: subprocess Error - {e}") sys.exit(nagios.UNKNOWN) except subprocess.CalledProcessError as e: print(f"UNKNOWN: subprocess Error - {e}") sys.exit(nagios.UNKNOWN) return wwn_ids def get_smart_health(wwn_id: str, scrutiny_endpoint: str) -> dict: url = f"{scrutiny_endpoint}/api/device/{wwn_id}/details" response = fetch_with_retry(url) if response.status_code == 200: return response.json() elif response.status_code == 404: return { 'fetch_error': f"Disk {wwn_id} not found on Scrutiny" } else: return { 'fetch_error': f"Error {response.status_code} for disk {wwn_id}: {response.text}" } def main(args): results = {} wwn_ids = get_disk_wwn_ids(args.ignore_non_smart) metrics_out_of_date = False for wwn_id in wwn_ids: smart_health = get_smart_health(wwn_id, args.scrutiny_endpoint) if smart_health.get('fetch_error'): print('UNKNOWN -', smart_health.get('fetch_error')) sys.exit(nagios.UNKNOWN) name = f'/dev/{smart_health["data"]["device"]["device_name"]} {wwn_id}' # differentiate disks in RAID arrays results[name] = { 'wwn_id': wwn_id, 'failed_attributes': [], } metadata = smart_health['metadata'] last_updated = dateparser.parse(smart_health['data']['device']['UpdatedAt']) if datetime.now(pytz.utc) - timedelta(hours=args.time_delta_limit) > last_updated.replace(tzinfo=pytz.utc): metrics_out_of_date = True if smart_health and len(smart_health['data']['smart_results']): try: disk_data = smart_health['data']['smart_results'][0]['attrs'] except Exception as e: print('UNKNOWN - failed to parse data:', e) print('Key "data":', smart_health['data'].keys()) print('Key "smart_results":', len(smart_health['data']['smart_results'])) print('Key "attrs":', smart_health['data']['smart_results'][0].keys()) sys.exit(nagios.UNKNOWN) for attribute_id, values in disk_data.items(): if values['status'] == 0: continue # elif values['status'] == 2 and not args.warn_non_critical: # continue values['attribute_name'] = metadata[attribute_id]['display_name'] values['metadata'] = metadata[attribute_id] if 'observed_thresholds' in values['metadata'].keys(): del values['metadata']['observed_thresholds'] results[name]['failed_attributes'].append(values) results[name]['status'] = 'good' else: results[name]['status'] = 'no data' crit_disks = {} warn_disks = {} for disk, values in results.items(): if values['status'] != 'no data': for item in values['failed_attributes']: if item['status'] == 2 and args.warn_non_critical: if disk not in warn_disks.keys(): warn_disks[disk] = [] warn_disks[disk].append({ 'raw_value': item['raw_value'], 'display_name': item['metadata']['display_name'] }) if item['status'] == 4: if disk not in crit_disks.keys(): crit_disks[disk] = [] crit_disks[disk].append({ 'raw_value': item['raw_value'], 'display_name': item['metadata']['display_name'] }) else: if disk not in warn_disks.keys(): warn_disks[disk] = [] warn_disks[disk].append({ 'raw_value': 'no data', 'display_name': 'no data' }) dt = '
' if args.html else '' dts = '
' if args.html else '' dd = '
' if args.html else '\t- ' dds = '
' if args.html else '' out_of_date_str = f'metrics are >{args.time_delta_limit} hrs out of date ' if metrics_out_of_date else '' return_code = nagios.OK if len(crit_disks): return_code = nagios.CRITICAL if len(warn_disks): x = f' and {len(warn_disks)} {"warnings" if len(results) > 1 else "warning"}' else: x = '' print( f'CRITICAL: {out_of_date_str + "and " if len(out_of_date_str) else ""}{len(crit_disks)} {"errors" if len(crit_disks) > 1 else "error"}{x}') print('
') print(f'{dt}Disks with Errors:{dts}') for disk, warns in crit_disks.items(): if args.html: disk_name = f'- {disk}' else: disk_name = f'\t- {disk}' print(f'{dd}{disk_name}: {", ".join([x["display_name"] for x in warns])}{dds}') print('
', end='') if len(out_of_date_str) and not len(crit_disks): return_code = nagios.CRITICAL w = "warnings" if len(warn_disks) > 1 else "warning" print(f'CRITICAL: {out_of_date_str}{"and " + str(len(warn_disks)) + " " + w if len(warn_disks) else ""}') if len(warn_disks): if return_code < nagios.CRITICAL: return_code = nagios.WARNING print(f'WARNING: {len(warn_disks)} {"warnings" if len(warn_disks) > 1 else "warning"}') print('
') print(f'{dt}Disks with issues:{dts}') for disk, warns in warn_disks.items(): if args.html: disk_name = f'- {disk}' else: disk_name = f'\t- {disk}' print(f'{dd}{disk_name}: {", ".join([x["display_name"] for x in warns])}{dds}') print('
', end='') if not len(crit_disks) and not len(warn_disks): print(f'OK: all {len(results.keys())} {"disks" if len(results.keys()) > 1 else "disk"} are healthy!', end='') print(f"|'warnings'={len(warn_disks)};;; 'errors'={len(crit_disks)};;; 'num_disks'={len(results.keys())};;;") sys.exit(return_code) def is_smartmontools_installed(): try: result = subprocess.run(['dpkg', '-s', 'smartmontools'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, text=True) if 'Status: install ok installed' in result.stdout: return True else: return False except subprocess.CalledProcessError: return False if __name__ == "__main__": parser = argparse.ArgumentParser(description='') parser.add_argument('--scrutiny-endpoint', required=True, help='Base URL for scrutiny.') parser.add_argument('--time-delta-limit', default=24, type=int, help='The Scrutiny data must not be older than this many hours. Default: 24.') parser.add_argument('--warn-non-critical', action='store_true', help='Warn when a non-critical metric is marked as failed.') parser.add_argument('--html', action='store_true', help='Print HTML.') parser.add_argument('--pretty-url', help='The pretty URL to link to when printing HTML.') parser.add_argument('--ignore-non-smart', action='store_true', help="Ignore any non-SMART devices and any devices that error when reading SMART.") parser.add_argument('--dont-warn-no-data', action='store_true', help="Don't warn if there is no data for a disk.") args = parser.parse_args() if args.html and not args.pretty_url: print('UKNOWN - when using --html you must also set --pretty-url') sys.exit(nagios.UNKNOWN) if not is_smartmontools_installed(): print('UNKNOWN - smartmontools is not installed.') sys.exit(nagios.UNKNOWN) args.scrutiny_endpoint = args.scrutiny_endpoint.strip('/') args.pretty_url = args.pretty_url.strip('/') if args.pretty_url else None try: main(args) except Exception as e: print(f'UNKNOWN: exception "{e}"') import traceback print(traceback.format_exc()) sys.exit(nagios.UNKNOWN)