icinga2-checks/check_scrutiny_disks.py

237 lines
9.7 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import subprocess
import sys
from datetime import datetime, timedelta
import pytz
from typing import List
import requests
from checker import nagios
from checker.http import fetch_with_retry
def get_disk_wwn_ids(ignore_non_smart: bool = False) -> List[str] or bool:
wwn_ids = []
try:
output = subprocess.check_output(["sudo", "smartctl", "--scan"])
for line in output.decode("utf-8").strip().split("\n"):
parts = line.split()
if len(parts) >= 3:
device = parts[0]
device_type = parts[2].replace('scsi', 'sat,auto')
try:
smart_supported = subprocess.check_output(
["sudo", "smartctl", "-i", device, "-d", device_type]).decode("utf-8")
if "SMART support is: Enabled" in smart_supported:
wwn_line = [line for line in smart_supported.split("\n") if "LU WWN Device Id" in line]
wwn_id = '0x' + wwn_line[0].replace('LU WWN Device Id: ', '').replace(' ', '')
wwn_ids.append(wwn_id)
# else:
# # TODO: warn if a drive doesn't support SMART
except subprocess.CalledProcessError as e:
if ignore_non_smart:
continue
else:
print(f"UNKNOWN: subprocess Error - {e}")
sys.exit(nagios.UNKNOWN)
except subprocess.CalledProcessError as e:
print(f"UNKNOWN: subprocess Error - {e}")
sys.exit(nagios.UNKNOWN)
return wwn_ids
def get_smart_health(wwn_id: str, scrutiny_endpoint: str) -> dict:
url = f"{scrutiny_endpoint}/api/device/{wwn_id}/details"
response = fetch_with_retry(url)
if response.status_code == 200:
return response.json()
elif response.status_code == 404:
return {
'fetch_error': f"Disk {wwn_id} not found on Scrutiny"
}
else:
return {
'fetch_error': f"Error {response.status_code} for disk {wwn_id}: {response.text}"
}
def main(args):
results = {}
wwn_ids = get_disk_wwn_ids(args.ignore_non_smart)
metrics_out_of_date = False
for wwn_id in wwn_ids:
smart_health = get_smart_health(wwn_id, args.scrutiny_endpoint)
if smart_health.get('fetch_error'):
print('UNKNOWN -', smart_health.get('fetch_error'))
sys.exit(nagios.UNKNOWN)
name = f'/dev/{smart_health["data"]["device"]["device_name"]} {wwn_id}' # differentiate disks in RAID arrays
results[name] = {
'wwn_id': wwn_id,
'failed_attributes': [],
}
metadata = smart_health['metadata']
# For testing
# smart_health['data']['device']['UpdatedAt'] = '2023-04-28T23:00:03.071184465Z'
last_updated = datetime.strptime(smart_health['data']['device']['UpdatedAt'][:-10] + smart_health['data']['device']['UpdatedAt'][-6:], '%Y-%m-%dT%H:%M:%S.%f%z')
if datetime.now(pytz.utc) - timedelta(hours=args.time_delta_limit) > last_updated:
metrics_out_of_date = True
if smart_health and len(smart_health['data']['smart_results']):
try:
disk_data = smart_health['data']['smart_results'][0]['attrs']
except Exception as e:
print('UNKNOWN - failed to parse data:', e)
print('Key "data":', smart_health['data'].keys())
print('Key "smart_results":', len(smart_health['data']['smart_results']))
print('Key "attrs":', smart_health['data']['smart_results'][0].keys())
sys.exit(nagios.UNKNOWN)
for attribute_id, values in disk_data.items():
if values['status'] == 0:
continue
# elif values['status'] == 2 and not args.warn_non_critical:
# continue
values['attribute_name'] = metadata[attribute_id]['display_name']
values['metadata'] = metadata[attribute_id]
if 'observed_thresholds' in values['metadata'].keys():
del values['metadata']['observed_thresholds']
results[name]['failed_attributes'].append(values)
results[name]['status'] = 'good'
else:
results[name]['status'] = 'no data'
crit_disks = {}
warn_disks = {}
for disk, values in results.items():
if values['status'] != 'no data':
for item in values['failed_attributes']:
if item['status'] == 2 and args.warn_non_critical:
if disk not in warn_disks.keys():
warn_disks[disk] = []
warn_disks[disk].append({
'raw_value': item['raw_value'],
'display_name': item['metadata']['display_name']
})
if item['status'] == 4:
if disk not in crit_disks.keys():
crit_disks[disk] = []
crit_disks[disk].append({
'raw_value': item['raw_value'],
'display_name': item['metadata']['display_name']
})
else:
if disk not in warn_disks.keys():
warn_disks[disk] = []
warn_disks[disk].append({
'raw_value': 'no data',
'display_name': 'no data'
})
dt = '<dt>' if args.html else ''
dts = '</dt>' if args.html else ''
dd = '<dd>' if args.html else '\t- '
dds = '</dd>' if args.html else ''
out_of_date_str = f'metrics are >{args.time_delta_limit} hrs out of date ' if metrics_out_of_date else ''
return_code = nagios.OK
if len(crit_disks):
return_code = nagios.CRITICAL
if len(warn_disks):
x = f' and {len(warn_disks)} {"warnings" if len(results) > 1 else "warning"}'
else:
x = ''
print(
f'CRITICAL: {out_of_date_str + "and " if len(out_of_date_str) else ""}{len(crit_disks)} {"errors" if len(crit_disks) > 1 else "error"}{x}')
print('<dl>')
print(f'{dt}Disks with Errors:{dts}')
for disk, warns in crit_disks.items():
if args.html:
disk_name = f'- <a href="{args.pretty_url}/web/device/{results[disk]["wwn_id"]}" target="_blank">{disk}</a>'
else:
disk_name = f'\t- {disk}'
print(f'{dd}{disk_name}: {", ".join([x["display_name"] for x in warns])}{dds}')
print('</dl>', end='')
if len(out_of_date_str) and not len(crit_disks):
return_code = nagios.CRITICAL
w = "warnings" if len(warn_disks) > 1 else "warning"
print(f'CRITICAL: {out_of_date_str}{"and " + str(len(warn_disks)) + " " + w if len(warn_disks) else ""}')
if len(warn_disks):
if return_code < nagios.CRITICAL:
return_code = nagios.WARNING
print(f'WARNING: {len(warn_disks)} {"warnings" if len(warn_disks) > 1 else "warning"}')
print('<dl>')
print(f'{dt}Disks with issues:{dts}')
for disk, warns in warn_disks.items():
if args.html:
disk_name = f'- <a href="{args.pretty_url}/web/device/{results[disk]["wwn_id"]}" target="_blank">{disk}</a>'
else:
disk_name = f'\t- {disk}'
print(f'{dd}{disk_name}: {", ".join([x["display_name"] for x in warns])}{dds}')
print('</dl>', end='')
if not len(crit_disks) and not len(warn_disks):
print(f'OK: all {len(results.keys())} {"disks" if len(results.keys()) > 1 else "disk"} are healthy!', end='')
print(f"|'warnings'={len(warn_disks)};;; 'errors'={len(crit_disks)};;; 'num_disks'={len(results.keys())};;;")
sys.exit(return_code)
def is_smartmontools_installed():
try:
result = subprocess.run(['dpkg', '-s', 'smartmontools'], stdout=subprocess.PIPE, stderr=subprocess.PIPE,
check=True, text=True)
if 'Status: install ok installed' in result.stdout:
return True
else:
return False
except subprocess.CalledProcessError:
return False
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='')
parser.add_argument('--scrutiny-endpoint', required=True, help='Base URL for scrutiny.')
parser.add_argument('--time-delta-limit', default=24, type=int,
help='The Scrutiny data must not be older than this many hours. Default: 24.')
parser.add_argument('--warn-non-critical', action='store_true',
help='Warn when a non-critical metric is marked as failed.')
parser.add_argument('--html', action='store_true', help='Print HTML.')
parser.add_argument('--pretty-url', help='The pretty URL to link to when printing HTML.')
parser.add_argument('--ignore-non-smart', action='store_true',
help="Ignore any non-SMART devices and any devices that error when reading SMART.")
parser.add_argument('--dont-warn-no-data', action='store_true', help="Don't warn if there is no data for a disk.")
args = parser.parse_args()
if args.html and not args.pretty_url:
print('UKNOWN - when using --html you must also set --pretty-url')
sys.exit(nagios.UNKNOWN)
if not is_smartmontools_installed():
print('UNKNOWN - smartmontools is not installed.')
sys.exit(nagios.UNKNOWN)
args.scrutiny_endpoint = args.scrutiny_endpoint.strip('/')
args.pretty_url = args.pretty_url.strip('/') if args.pretty_url else None
try:
main(args)
except Exception as e:
print(f'UNKNOWN: exception "{e}"')
import traceback
print(traceback.format_exc())
sys.exit(nagios.UNKNOWN)