icinga2-checks/check_pve_guest_metrics.py

218 lines
8.9 KiB
Python
Raw Normal View History

2023-04-21 23:54:20 -06:00
#!/usr/bin/env python3
import argparse
import json
import os
import sys
import traceback
from pathlib import Path
import certifi
import numpy as np
import requests
from proxmoxer import ProxmoxAPI, ResourceException
2023-04-21 23:54:20 -06:00
import checker.nagios as nagios
from checker.markdown import list_to_markdown_table
from checker.result import quit_check
2023-04-21 23:54:20 -06:00
from checker.units import filesize
parser = argparse.ArgumentParser(description='Check the Proxmox API for network traffic for a host.')
parser.add_argument('--node', required=True, help='The name and address of Proxmox node in valid JSON in this format: ["bigserver", "192.168.1.222"]. This allows you to use datalists in Director.')
parser.add_argument('--user', required=True, help='The Proxmox user. Something like "monitoring@pve"')
2023-09-09 12:15:36 -06:00
parser.add_argument('--password', required=True, help='API password.')
2023-04-21 23:54:20 -06:00
parser.add_argument('--host', required=True, help='The ID of the host to check.')
parser.add_argument('--type', required=True, choices=['qemu', 'lxc'], help='Type of host. "qemu" or "lxc"')
parser.add_argument('--metrics', required=True, help='What stats to check. Can list multiple seperated by commas. For example, "netin,netout"')
parser.add_argument('--levels', required=True, help='Warning levels. In JSON format: {"netin":{"warn":50, "crit":100, "type": "filesize"}, "netout":{"warn":50, "crit":100, "type": "filesize"}}')
parser.add_argument('--timeframe', default=5, help='Timeframe to average the data to in minutes. Default: 5 minutes')
parser.add_argument('--verify', default=False,
help="What to verify the SSL connection with. Can be a file path, or false to disable verification. If you're having issues with CA certs, try setting it to your system's CA bundle (/etc/ssl/certs/ca-certificates.crt). Default: false (verification disabled)")
2023-04-21 23:54:20 -06:00
parser.add_argument('--verify-force', action='store_true', help="Delete the certifi cert and replace it with whatever you specify in --verify")
parser.add_argument('--table', action='store_true', help='Print the results in a table.')
args = parser.parse_args()
# def where():
# return args.verify
def main():
if args.verify_force:
if not args.verify:
print('UNKNOWN: must supply --verify when using --verify-force')
sys.exit(nagios.UNKNOWN)
if Path(certifi.where()).exists():
os.remove(certifi.where())
os.symlink(args.verify, certifi.where())
print(f'Pointed {certifi.where()} to {args.verify}')
if Path(requests.certs.where()).exists():
os.remove(requests.certs.where())
os.symlink(args.verify, requests.certs.where())
print(f'Pointed {requests.certs.where()} to {args.verify}')
try:
metrics_levels = json.loads(args.levels)
except Exception as e:
print('UNKNOWN: Failed to parse --levels JSON:', e)
sys.exit(nagios.UNKNOWN)
for k, v in metrics_levels.items():
metrics_levels[k]['min'] = metrics_levels[k].get('min')
if isinstance(metrics_levels[k]['min'], float):
metrics_levels[k]['min'] = int(metrics_levels[k]['min'])
if isinstance(metrics_levels[k]['warn'], float):
metrics_levels[k]['warn'] = int(metrics_levels[k]['warn'])
if isinstance(metrics_levels[k]['crit'], float):
metrics_levels[k]['crit'] = int(metrics_levels[k]['crit'])
2023-04-21 23:54:20 -06:00
try:
args.node = json.loads(args.node)
pve_node = args.node[0]
pve_node_address = args.node[1]
except Exception as e:
print('UNKNOWN: Failed to parse --node JSON:', e)
sys.exit(nagios.UNKNOWN)
2023-09-09 12:15:36 -06:00
prox = ProxmoxAPI(pve_node_address, user=args.user, password=args.password, verify_ssl=args.verify)
try:
user_perms = prox('access/permissions').get()
except Exception as e:
user_perms = f'{e.__class__.__name__}: {e}'
2023-04-21 23:54:20 -06:00
try:
t = prox.nodes(pve_node).status.get() # test connection and permissions
if not len(t):
print('UNKNOWN: PVE API returned no nodes.')
sys.exit(nagios.UNKNOWN)
2023-04-21 23:54:20 -06:00
except requests.exceptions.SSLError as e:
print('UNKNOWN: SSL error ', e)
2023-04-21 23:54:20 -06:00
print('Using cert:', args.verify)
print('certifi using cert:', certifi.where())
print('requests using cert:', requests.certs.where())
sys.exit(nagios.UNKNOWN)
except ResourceException as e:
print('UNKNOWN:', e)
2023-09-09 12:15:36 -06:00
print(f'Proxmox reported "{args.user}" permissions as:', user_perms)
sys.exit(nagios.UNKNOWN)
except Exception as e:
print('UNKNOWN: failed to connect to Proxmox API:', e)
2023-04-21 23:54:20 -06:00
try:
api_data = prox(f'nodes/{pve_node}/{args.type}/{args.host}/rrddata?timeframe=hour').get()
2023-04-21 23:54:20 -06:00
except Exception as e:
print(f'UNKNOWN: Failed to fetch API data - ', f'{e.__class__.__name__}: {e}')
2023-04-21 23:54:20 -06:00
sys.exit(nagios.UNKNOWN)
# Load the data
metrics_data = {}
for item in args.metrics.split(','):
if item not in metrics_levels.keys():
print(f'UNKNOWN: missing metric "{item}" in --levels')
sys.exit(nagios.UNKNOWN)
if 'warn' not in metrics_levels[item].keys():
print(f'UNKNOWN: missing key "warn" for metric "{item}" in --levels')
sys.exit(nagios.UNKNOWN)
if 'crit' not in metrics_levels[item].keys():
print(f'UNKNOWN: missing key "crit" for metric "{item}" in --levels')
sys.exit(nagios.UNKNOWN)
if 'type' not in metrics_levels[item].keys():
print(f'UNKNOWN: missing key "type" for metric "{item}" in --levels')
sys.exit(nagios.UNKNOWN)
metrics_data[item] = []
for m in api_data:
for k, v in m.items():
if k == item:
if isinstance(v, float):
v = np.round(v, 2)
metrics_data[item].append(v)
check_data = {}
exit_code = nagios.OK
for metric, value in metrics_data.items():
check_data[metric] = {}
# Average the data. Expects the interval to be 1 minute
if len(value) > 0:
avg = np.round(np.average(value[-5:-1]), 2) # TODO: why [-5:-1]
check_data[metric]['nan'] = False
else:
# Prevent NaN errors
check_data[metric]['nan'] = True
check_data[metric]['value_str'] = 'NaN'
continue
2023-04-21 23:54:20 -06:00
check_data[metric]['value'] = avg
if not avg:
quit_check('no data', nagios.STATE_UNKNOWN)
2023-04-21 23:54:20 -06:00
if metrics_levels[metric]['type'] == 'filesize':
check_data[metric]['value_str'] = filesize(avg)
2023-04-21 23:54:20 -06:00
check_data[metric]['value'] = f'{int(avg)}B'
2023-04-21 23:54:20 -06:00
else:
check_data[metric]['value_str'] = str(avg)
if avg >= metrics_levels[metric]['crit']:
check_data[metric]['status'] = nagios.CRITICAL
check_data[metric]['status_str'] = '[CRITICAL]'
elif avg >= metrics_levels[metric]['warn']:
check_data[metric]['status'] = nagios.WARNING
2023-04-21 23:54:20 -06:00
check_data[metric]['status_str'] = '[WARNING]'
else:
check_data[metric]['status'] = nagios.OK
check_data[metric]['status_str'] = '[OK]'
if exit_code < check_data[metric]['status']:
exit_code = check_data[metric]['status']
if exit_code == nagios.OK:
output_str = 'OK: '
elif exit_code == nagios.WARNING:
output_str = 'WARNING: '
elif exit_code == nagios.CRITICAL:
output_str = 'CRITICAL: '
else:
output_str = 'UNKNOWN: '
# Check for NaNs
for metric, data in check_data.items():
if check_data[metric]['nan']:
output_str = 'UNKNOWN: '
exit_code = nagios.UNKNOWN
2023-04-21 23:54:20 -06:00
perf_data = []
for metric, data in check_data.items():
output_str = output_str + f"{metric} {data['value_str']}, "
if not check_data[metric]['nan']:
if metrics_levels[metric]['min'] is None:
perf_data.append(f"'{metric}'={data['value']};{metrics_levels[metric]['warn']};{metrics_levels[metric]['crit']};;")
else:
perf_data.append(f"'{metric}'={data['value']};{metrics_levels[metric]['warn']};{metrics_levels[metric]['crit']};{metrics_levels[metric]['min']};")
2023-04-21 23:54:20 -06:00
print(output_str.strip(', ').strip(), end=('\n' if args.table else ''))
if len(perf_data):
perf_data_str = f'| {" ".join(perf_data)}'
else:
perf_data_str = ''
2023-04-21 23:54:20 -06:00
if args.table:
output_table = [('Metric', 'Value', 'Status')]
for metric, data in check_data.items():
output_table.append((metric, data['value_str'], data['status_str']))
print(list_to_markdown_table(output_table, align='left', seperator='!', borders=False))
# else:
# perf_data_str = ' ' + perf_data_str
print(perf_data_str)
sys.exit(exit_code)
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f'UNKNOWN: exception "{e}"')
print(traceback.format_exc())
sys.exit(nagios.UNKNOWN)