icinga2-checks/check_scrutiny_disks.py

237 lines
9.7 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import subprocess
import sys
from datetime import datetime, timedelta
from typing import List
import pandas as pd
import pytz
from checker import nagios
from checker.http import fetch_with_retry
def get_disk_wwn_ids(ignore_non_smart: bool = False) -> List[str] or bool:
wwn_ids = []
try:
output = subprocess.check_output(["sudo", "smartctl", "--scan"])
for line in output.decode("utf-8").strip().split("\n"):
parts = line.split()
if len(parts) >= 3:
device = parts[0]
device_type = parts[2].replace('scsi', 'sat,auto')
try:
smart_supported = subprocess.check_output(
["sudo", "smartctl", "-i", device, "-d", device_type]).decode("utf-8")
if "SMART support is: Enabled" in smart_supported:
wwn_line = [line for line in smart_supported.split("\n") if "LU WWN Device Id" in line]
wwn_id = '0x' + wwn_line[0].replace('LU WWN Device Id: ', '').replace(' ', '')
wwn_ids.append(wwn_id)
# else:
# # TODO: warn if a drive doesn't support SMART
except subprocess.CalledProcessError as e:
if ignore_non_smart:
continue
else:
print(f"UNKNOWN: subprocess Error - {e}")
sys.exit(nagios.UNKNOWN)
except subprocess.CalledProcessError as e:
print(f"UNKNOWN: subprocess Error - {e}")
sys.exit(nagios.UNKNOWN)
return wwn_ids
def get_smart_health(wwn_id: str, scrutiny_endpoint: str) -> dict:
url = f"{scrutiny_endpoint}/api/device/{wwn_id}/details"
response = fetch_with_retry(url)
if response.status_code == 200:
return response.json()
elif response.status_code == 404:
return {
'fetch_error': f"Disk {wwn_id} not found on Scrutiny"
}
else:
return {
'fetch_error': f"Error {response.status_code} for disk {wwn_id}: {response.text}"
}
def main(args):
results = {}
wwn_ids = get_disk_wwn_ids(args.ignore_non_smart)
metrics_out_of_date = False
for wwn_id in wwn_ids:
smart_health = get_smart_health(wwn_id, args.scrutiny_endpoint)
if smart_health.get('fetch_error'):
print('UNKNOWN -', smart_health.get('fetch_error'))
sys.exit(nagios.UNKNOWN)
name = f'/dev/{smart_health["data"]["device"]["device_name"]} {wwn_id}' # differentiate disks in RAID arrays
results[name] = {
'wwn_id': wwn_id,
'failed_attributes': [],
}
metadata = smart_health['metadata']
# For testing
# smart_health['data']['device']['UpdatedAt'] = '2023-04-28T23:00:03.071184465Z'
last_updated = pd.to_datetime(smart_health['data']['device']['UpdatedAt']) # , '%Y-%m-%dT%H:%M:%S.%fZ')
if datetime.now(pytz.utc) - timedelta(hours=args.time_delta_limit) > last_updated.replace(tzinfo=pytz.utc):
metrics_out_of_date = True
if smart_health and len(smart_health['data']['smart_results']):
try:
disk_data = smart_health['data']['smart_results'][0]['attrs']
except Exception as e:
print('UNKNOWN - failed to parse data:', e)
print('Key "data":', smart_health['data'].keys())
print('Key "smart_results":', len(smart_health['data']['smart_results']))
print('Key "attrs":', smart_health['data']['smart_results'][0].keys())
sys.exit(nagios.UNKNOWN)
for attribute_id, values in disk_data.items():
if values['status'] == 0:
continue
# elif values['status'] == 2 and not args.warn_non_critical:
# continue
values['attribute_name'] = metadata[attribute_id]['display_name']
values['metadata'] = metadata[attribute_id]
if 'observed_thresholds' in values['metadata'].keys():
del values['metadata']['observed_thresholds']
results[name]['failed_attributes'].append(values)
results[name]['status'] = 'good'
else:
results[name]['status'] = 'no data'
crit_disks = {}
warn_disks = {}
for disk, values in results.items():
if values['status'] != 'no data':
for item in values['failed_attributes']:
if item['status'] == 2 and args.warn_non_critical:
if disk not in warn_disks.keys():
warn_disks[disk] = []
warn_disks[disk].append({
'raw_value': item['raw_value'],
'display_name': item['metadata']['display_name']
})
if item['status'] == 4:
if disk not in crit_disks.keys():
crit_disks[disk] = []
crit_disks[disk].append({
'raw_value': item['raw_value'],
'display_name': item['metadata']['display_name']
})
else:
if disk not in warn_disks.keys():
warn_disks[disk] = []
warn_disks[disk].append({
'raw_value': 'no data',
'display_name': 'no data'
})
dt = '<dt>' if args.html else ''
dts = '</dt>' if args.html else ''
dd = '<dd>' if args.html else '\t- '
dds = '</dd>' if args.html else ''
out_of_date_str = f'metrics are >{args.time_delta_limit} hrs out of date ' if metrics_out_of_date else ''
return_code = nagios.OK
if len(crit_disks):
return_code = nagios.CRITICAL
if len(warn_disks):
x = f' and {len(warn_disks)} {"warnings" if len(results) > 1 else "warning"}'
else:
x = ''
print(
f'CRITICAL: {out_of_date_str + "and " if len(out_of_date_str) else ""}{len(crit_disks)} {"errors" if len(crit_disks) > 1 else "error"}{x}')
print('<dl>')
print(f'{dt}Disks with Errors:{dts}')
for disk, warns in crit_disks.items():
if args.html:
disk_name = f'- <a href="{args.pretty_url}/web/device/{results[disk]["wwn_id"]}" target="_blank">{disk}</a>'
else:
disk_name = f'\t- {disk}'
print(f'{dd}{disk_name}: {", ".join([x["display_name"] for x in warns])}{dds}')
print('</dl>', end='')
if len(out_of_date_str) and not len(crit_disks):
return_code = nagios.CRITICAL
w = "warnings" if len(warn_disks) > 1 else "warning"
print(f'CRITICAL: {out_of_date_str}{"and " + str(len(warn_disks)) + " " + w if len(warn_disks) else ""}')
if len(warn_disks):
if return_code < nagios.CRITICAL:
return_code = nagios.WARNING
print(f'WARNING: {len(warn_disks)} {"warnings" if len(warn_disks) > 1 else "warning"}')
print('<dl>')
print(f'{dt}Disks with issues:{dts}')
for disk, warns in warn_disks.items():
if args.html:
disk_name = f'- <a href="{args.pretty_url}/web/device/{results[disk]["wwn_id"]}" target="_blank">{disk}</a>'
else:
disk_name = f'\t- {disk}'
print(f'{dd}{disk_name}: {", ".join([x["display_name"] for x in warns])}{dds}')
print('</dl>', end='')
if not len(crit_disks) and not len(warn_disks):
print(f'OK: all {len(results.keys())} {"disks" if len(results.keys()) > 1 else "disk"} are healthy!', end='')
print(f"|'warnings'={len(warn_disks)};;; 'errors'={len(crit_disks)};;; 'num_disks'={len(results.keys())};;;")
sys.exit(return_code)
def is_smartmontools_installed():
try:
result = subprocess.run(['dpkg', '-s', 'smartmontools'], stdout=subprocess.PIPE, stderr=subprocess.PIPE,
check=True, text=True)
if 'Status: install ok installed' in result.stdout:
return True
else:
return False
except subprocess.CalledProcessError:
return False
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='')
parser.add_argument('--scrutiny-endpoint', required=True, help='Base URL for scrutiny.')
parser.add_argument('--time-delta-limit', default=24, type=int,
help='The Scrutiny data must not be older than this many hours. Default: 24.')
parser.add_argument('--warn-non-critical', action='store_true',
help='Warn when a non-critical metric is marked as failed.')
parser.add_argument('--html', action='store_true', help='Print HTML.')
parser.add_argument('--pretty-url', help='The pretty URL to link to when printing HTML.')
parser.add_argument('--ignore-non-smart', action='store_true',
help="Ignore any non-SMART devices and any devices that error when reading SMART.")
parser.add_argument('--dont-warn-no-data', action='store_true', help="Don't warn if there is no data for a disk.")
args = parser.parse_args()
if args.html and not args.pretty_url:
print('UKNOWN - when using --html you must also set --pretty-url')
sys.exit(nagios.UNKNOWN)
if not is_smartmontools_installed():
print('UNKNOWN - smartmontools is not installed.')
sys.exit(nagios.UNKNOWN)
args.scrutiny_endpoint = args.scrutiny_endpoint.strip('/')
args.pretty_url = args.pretty_url.strip('/') if args.pretty_url else None
try:
main(args)
except Exception as e:
print(f'UNKNOWN: exception "{e}"')
import traceback
print(traceback.format_exc())
sys.exit(nagios.UNKNOWN)