icinga2-checks/check_scrutiny_disks.py

128 lines
4.9 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
import argparse
import subprocess
import sys
from typing import List
import requests
2023-05-28 14:12:07 -06:00
from checker import nagios
def get_disk_wwn_ids() -> List[str]:
wwn_ids = []
try:
output = subprocess.check_output(["lsblk", "-o", "NAME,WWN,TYPE", "-d", "-n", "-p"])
for line in output.decode("utf-8").strip().split("\n"):
parts = line.split()
if len(parts) == 3:
name, wwn, disk_type = parts
if wwn != "0" and disk_type == "disk":
2023-05-28 14:12:07 -06:00
smart_supported = subprocess.check_output(["sudo", "smartctl", "-i", name]).decode("utf-8")
if "SMART support is: Enabled" in smart_supported:
wwn_ids.append(wwn)
except subprocess.CalledProcessError as e:
2023-05-28 14:12:07 -06:00
print(f"UNKNOWN: subprocess Error - {e}")
sys.exit(nagios.UNKNOWN)
return wwn_ids
def get_smart_health(wwn_id: str, scrutiny_endpoint: str) -> dict:
url = f"{scrutiny_endpoint}/api/device/{wwn_id}/details"
response = requests.get(url)
if response.status_code == 200:
return response.json()
elif response.status_code == 404:
print(f"Disk {wwn_id} not found on Scrutiny")
return {}
else:
print(f"Scrutiny Error {response.status_code} for disk {wwn_id}: {response.text}")
return {}
2023-05-28 14:12:07 -06:00
def main(args):
results = {}
wwn_ids = get_disk_wwn_ids()
for wwn_id in wwn_ids:
2023-05-28 14:12:07 -06:00
smart_health = get_smart_health(wwn_id, args.scrutiny_endpoint)
disk_results = {
'wwn_id': wwn_id,
# 'name': f'/dev/{smart_health["data"]["device"]["device_name"]}',
'failed_attributes': [],
}
metadata = smart_health['metadata']
if smart_health:
2023-05-28 14:12:07 -06:00
# print(f"Disk {wwn_id} SMART health:")
# print(json.dumps(smart_health, indent=2))
2023-05-28 14:12:07 -06:00
for attribute_id, values in smart_health['data']['smart_results'][0]['attrs'].items():
if values['status'] == 0:
continue
# elif values['status'] == 2 and not args.warn_non_critical:
# continue
values['attribute_name'] = metadata[attribute_id]['display_name']
values['metadata'] = metadata[attribute_id]
if 'observed_thresholds' in values['metadata'].keys():
del values['metadata']['observed_thresholds']
disk_results['failed_attributes'].append(values)
2023-05-28 14:12:07 -06:00
results[smart_health['data']['device']['device_name']] = disk_results
crit_disks = {}
warn_disks = {}
for disk, values in results.items():
for item in values['failed_attributes']:
if item['status'] == 2 and args.warn_non_critical:
if disk not in warn_disks.keys():
warn_disks[disk] = []
warn_disks[disk].append({
'raw_value': item['raw_value'],
'display_name': item['metadata']['display_name']
})
if item['status'] == 4:
if disk not in crit_disks.keys():
crit_disks[disk] = []
crit_disks[disk].append({
'raw_value': item['raw_value'],
'display_name': item['metadata']['display_name']
})
return_code = nagios.OK
if len(crit_disks):
return_code = nagios.CRITICAL
print(f'CRITICAL: {len(crit_disks)} {"error" if len(results) == 0 else "errors"} - {args.scrutiny_endpoint}')
print('Disks with Errors:')
for disk, warns in crit_disks.items():
print(f'\t- /dev/{disk}: {", ".join([x["display_name"] for x in warns])}')
if len(warn_disks):
if return_code < nagios.CRITICAL:
return_code = nagios.WARNING
print(f'WARNING: {len(crit_disks)} {"warning" if len(results) == 0 else "warnings"} - {args.scrutiny_endpoint}')
print('Disks with issues:')
for disk, warns in warn_disks.items():
print(f'\t- /dev/{disk}: {", ".join([x["display_name"] for x in warns])}')
if not len(crit_disks) and not len(warn_disks):
print(f'OK: all {len(results)} {"disk" if len(results) == 0 else "disks"} are healthy!', end='')
print(f"|'warnings'={len(warn_disks)};;; 'errors'={len(crit_disks)};;; 'num_disks'={len(results)};;;")
sys.exit(return_code)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='')
parser.add_argument('--scrutiny-endpoint', required=True, help='Base URL for scrutiny.')
2023-05-28 14:12:07 -06:00
parser.add_argument('--warn-non-critical', action='store_true', help='Warn when a non-critical metric is marked as failed.')
args = parser.parse_args()
args.scrutiny_endpoint = args.scrutiny_endpoint.strip('/')
try:
2023-05-28 14:12:07 -06:00
main(args)
except Exception as e:
print(f'UNKNOWN: exception "{e}"')
import traceback
2023-05-28 14:12:07 -06:00
print(traceback.format_exc())
sys.exit(nagios.UNKNOWN)