check_pve: option to ignore unknown disks

check_scrtiny_disks: draft
This commit is contained in:
Cyberes 2023-05-28 12:50:04 -06:00
parent f021c8cddd
commit 357f1f2d9e
2 changed files with 86 additions and 14 deletions

View File

@ -23,8 +23,8 @@
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
# ------------------------------------------------------------------------------
import sys
import re
import sys
try:
from enum import Enum
@ -93,14 +93,14 @@ class CheckPVE:
VERSION = '1.2.2'
API_URL = 'https://{hostname}:{port}/api2/json/{command}'
UNIT_SCALE = {
"GB": 10**9,
"MB": 10**6,
"KB": 10**3,
"GiB": 2**30,
"MiB": 2**20,
"KiB": 2**10,
"B": 1
}
"GB": 10 ** 9,
"MB": 10 ** 6,
"KB": 10 ** 3,
"GiB": 2 ** 30,
"MiB": 2 ** 20,
"KiB": 2 ** 10,
"B": 1
}
def check_output(self):
message = self.check_message
@ -259,7 +259,8 @@ class CheckPVE:
continue
if disk['health'] == 'UNKNOWN':
self.check_result = CheckState.WARNING
if not self.options.ignore_unknown_disks:
self.check_result = CheckState.WARNING
unknown.append({"serial": disk["serial"], "device": disk['devpath']})
elif disk['health'] not in ('PASSED', 'OK'):
@ -567,7 +568,7 @@ class CheckPVE:
is_critical = False
if not isinstance(value, dict):
value = { None: value }
value = {None: value}
for metric, value in value.items():
value_warning = self.threshold_warning(metric)
@ -591,7 +592,7 @@ class CheckPVE:
if self.options.unit in self.UNIT_SCALE:
return value / self.UNIT_SCALE[self.options.unit]
else:
assert('wrong unit')
assert ('wrong unit')
def threshold_warning(self, name: str):
return self.options.threshold_warning.get(name, self.options.threshold_warning.get(None, None))
@ -755,6 +756,8 @@ class CheckPVE:
check_opts.add_argument('--unit', choices=self.UNIT_SCALE.keys(), default='MiB', help='Unit which is used for performance data and other values')
check_opts.add_argument('--ignore-unknown-disks', action='store_true', help='Skip checking disks that have an unknown health status (usually because they don\'t support SMART.')
options = p.parse_args()
if not options.node and options.mode not in ['cluster', 'vm', 'vm_status', 'version', 'ceph-health']:
@ -787,9 +790,9 @@ class CheckPVE:
return ok
if options.threshold_warning and options.threshold_critical:
if options.mode != 'subscription' and not compare_thresholds(options.threshold_warning, options.threshold_critical, lambda w,c: w<=c):
if options.mode != 'subscription' and not compare_thresholds(options.threshold_warning, options.threshold_critical, lambda w, c: w <= c):
p.error("Critical value must be greater than warning value")
elif options.mode == 'subscription' and not compare_thresholds(options.threshold_warning, options.threshold_critical, lambda w,c: w>=c):
elif options.mode == 'subscription' and not compare_thresholds(options.threshold_warning, options.threshold_critical, lambda w, c: w >= c):
p.error("Critical value must be lower than warning value")
self.options = options
@ -815,5 +818,6 @@ class CheckPVE:
elif self.options.api_token is not None:
self.__headers["Authorization"] = "PVEAPIToken={}!{}".format(self.options.api_user, self.options.api_token)
pve = CheckPVE()
pve.check()

68
check_scrutiny_disks.py Executable file
View File

@ -0,0 +1,68 @@
#!/usr/bin/env python3
import argparse
import json
import subprocess
import sys
from typing import List
from checker import nagios
import requests
def get_disk_wwn_ids() -> List[str]:
wwn_ids = []
try:
output = subprocess.check_output(["lsblk", "-o", "NAME,WWN,TYPE", "-d", "-n", "-p"])
for line in output.decode("utf-8").strip().split("\n"):
parts = line.split()
if len(parts) == 3:
name, wwn, disk_type = parts
if wwn != "0" and disk_type == "disk":
smart_supported = subprocess.check_output(["smartctl", "-i", name]).decode("utf-8")
if "SMART support is: Enabled" in smart_supported:
wwn_ids.append(wwn)
except subprocess.CalledProcessError as e:
print(f"Subprocess Error: {e}")
return wwn_ids
def get_smart_health(wwn_id: str, scrutiny_endpoint: str) -> dict:
url = f"{scrutiny_endpoint}/api/device/{wwn_id}/details"
response = requests.get(url)
if response.status_code == 200:
return response.json()
elif response.status_code == 404:
print(f"Disk {wwn_id} not found on Scrutiny")
return {}
else:
print(f"Scrutiny Error {response.status_code} for disk {wwn_id}: {response.text}")
return {}
def main(scrutiny_endpoint: str):
results = {}
wwn_ids = get_disk_wwn_ids()
for wwn_id in wwn_ids:
smart_health = get_smart_health(wwn_id, scrutiny_endpoint)
if smart_health:
print(f"Disk {wwn_id} SMART health:")
print(json.dumps(smart_health, indent=2))
for metric in smart_health['data']['smart_results'][0]['attrs']:
print(metric)
results[smart_health['data']['device']['device_name']] = {}
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='')
parser.add_argument('--scrutiny-endpoint', required=True, help='Base URL for scrutiny.')
args = parser.parse_args()
args.scrutiny_endpoint = args.scrutiny_endpoint.strip('/')
try:
main(args.scrutiny_endpoint)
except Exception as e:
print(f'UNKNOWN: exception "{e}"')
import traceback
print(traceback.format_exc())
sys.exit(nagios.UNKNOWN)