icinga2-checks/check_pve_guest_metrics.py

218 lines
8.9 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import json
import os
import sys
import traceback
from pathlib import Path
import certifi
import numpy as np
import requests
from proxmoxer import ProxmoxAPI, ResourceException
import checker.nagios as nagios
from checker.markdown import list_to_markdown_table
from checker.result import quit_check
from checker.units import filesize
parser = argparse.ArgumentParser(description='Check the Proxmox API for network traffic for a host.')
parser.add_argument('--node', required=True, help='The name and address of Proxmox node in valid JSON in this format: ["bigserver", "192.168.1.222"]. This allows you to use datalists in Director.')
parser.add_argument('--user', required=True, help='The Proxmox user. Something like "monitoring@pve"')
parser.add_argument('--password', required=True, help='API password.')
parser.add_argument('--host', required=True, help='The ID of the host to check.')
parser.add_argument('--type', required=True, choices=['qemu', 'lxc'], help='Type of host. "qemu" or "lxc"')
parser.add_argument('--metrics', required=True, help='What stats to check. Can list multiple seperated by commas. For example, "netin,netout"')
parser.add_argument('--levels', required=True, help='Warning levels. In JSON format: {"netin":{"warn":50, "crit":100, "type": "filesize"}, "netout":{"warn":50, "crit":100, "type": "filesize"}}')
parser.add_argument('--timeframe', default=5, help='Timeframe to average the data to in minutes. Default: 5 minutes')
parser.add_argument('--verify', default=False,
help="What to verify the SSL connection with. Can be a file path, or false to disable verification. If you're having issues with CA certs, try setting it to your system's CA bundle (/etc/ssl/certs/ca-certificates.crt). Default: false (verification disabled)")
parser.add_argument('--verify-force', action='store_true', help="Delete the certifi cert and replace it with whatever you specify in --verify")
parser.add_argument('--table', action='store_true', help='Print the results in a table.')
args = parser.parse_args()
# def where():
# return args.verify
def main():
if args.verify_force:
if not args.verify:
print('UNKNOWN: must supply --verify when using --verify-force')
sys.exit(nagios.UNKNOWN)
if Path(certifi.where()).exists():
os.remove(certifi.where())
os.symlink(args.verify, certifi.where())
print(f'Pointed {certifi.where()} to {args.verify}')
if Path(requests.certs.where()).exists():
os.remove(requests.certs.where())
os.symlink(args.verify, requests.certs.where())
print(f'Pointed {requests.certs.where()} to {args.verify}')
try:
metrics_levels = json.loads(args.levels)
except Exception as e:
print('UNKNOWN: Failed to parse --levels JSON:', e)
sys.exit(nagios.UNKNOWN)
for k, v in metrics_levels.items():
metrics_levels[k]['min'] = metrics_levels[k].get('min')
if isinstance(metrics_levels[k]['min'], float):
metrics_levels[k]['min'] = int(metrics_levels[k]['min'])
if isinstance(metrics_levels[k]['warn'], float):
metrics_levels[k]['warn'] = int(metrics_levels[k]['warn'])
if isinstance(metrics_levels[k]['crit'], float):
metrics_levels[k]['crit'] = int(metrics_levels[k]['crit'])
try:
args.node = json.loads(args.node)
pve_node = args.node[0]
pve_node_address = args.node[1]
except Exception as e:
print('UNKNOWN: Failed to parse --node JSON:', e)
sys.exit(nagios.UNKNOWN)
prox = ProxmoxAPI(pve_node_address, user=args.user, password=args.password, verify_ssl=args.verify)
try:
user_perms = prox('access/permissions').get()
except Exception as e:
user_perms = f'{e.__class__.__name__}: {e}'
try:
t = prox.nodes(pve_node).status.get() # test connection and permissions
if not len(t):
print('UNKNOWN: PVE API returned no nodes.')
sys.exit(nagios.UNKNOWN)
except requests.exceptions.SSLError as e:
print('UNKNOWN: SSL error ', e)
print('Using cert:', args.verify)
print('certifi using cert:', certifi.where())
print('requests using cert:', requests.certs.where())
sys.exit(nagios.UNKNOWN)
except ResourceException as e:
print('UNKNOWN:', e)
print(f'Proxmox reported "{args.user}" permissions as:', user_perms)
sys.exit(nagios.UNKNOWN)
except Exception as e:
print('UNKNOWN: failed to connect to Proxmox API:', e)
try:
api_data = prox(f'nodes/{pve_node}/{args.type}/{args.host}/rrddata?timeframe=hour').get()
except Exception as e:
print(f'UNKNOWN: Failed to fetch API data - ', f'{e.__class__.__name__}: {e}')
sys.exit(nagios.UNKNOWN)
# Load the data
metrics_data = {}
for item in args.metrics.split(','):
if item not in metrics_levels.keys():
print(f'UNKNOWN: missing metric "{item}" in --levels')
sys.exit(nagios.UNKNOWN)
if 'warn' not in metrics_levels[item].keys():
print(f'UNKNOWN: missing key "warn" for metric "{item}" in --levels')
sys.exit(nagios.UNKNOWN)
if 'crit' not in metrics_levels[item].keys():
print(f'UNKNOWN: missing key "crit" for metric "{item}" in --levels')
sys.exit(nagios.UNKNOWN)
if 'type' not in metrics_levels[item].keys():
print(f'UNKNOWN: missing key "type" for metric "{item}" in --levels')
sys.exit(nagios.UNKNOWN)
metrics_data[item] = []
for m in api_data:
for k, v in m.items():
if k == item:
if isinstance(v, float):
v = np.round(v, 2)
metrics_data[item].append(v)
check_data = {}
exit_code = nagios.OK
for metric, value in metrics_data.items():
check_data[metric] = {}
# Average the data. Expects the interval to be 1 minute
if len(value) > 0:
avg = np.round(np.average(value[-5:-1]), 2) # TODO: why [-5:-1]
check_data[metric]['nan'] = False
else:
# Prevent NaN errors
check_data[metric]['nan'] = True
check_data[metric]['value_str'] = 'NaN'
continue
check_data[metric]['value'] = avg
if not avg:
quit_check('no data', nagios.STATE_UNKNOWN)
if metrics_levels[metric]['type'] == 'filesize':
check_data[metric]['value_str'] = filesize(avg)
check_data[metric]['value'] = f'{int(avg)}B'
else:
check_data[metric]['value_str'] = str(avg)
if avg >= metrics_levels[metric]['crit']:
check_data[metric]['status'] = nagios.CRITICAL
check_data[metric]['status_str'] = '[CRITICAL]'
elif avg >= metrics_levels[metric]['warn']:
check_data[metric]['status'] = nagios.WARNING
check_data[metric]['status_str'] = '[WARNING]'
else:
check_data[metric]['status'] = nagios.OK
check_data[metric]['status_str'] = '[OK]'
if exit_code < check_data[metric]['status']:
exit_code = check_data[metric]['status']
if exit_code == nagios.OK:
output_str = 'OK: '
elif exit_code == nagios.WARNING:
output_str = 'WARNING: '
elif exit_code == nagios.CRITICAL:
output_str = 'CRITICAL: '
else:
output_str = 'UNKNOWN: '
# Check for NaNs
for metric, data in check_data.items():
if check_data[metric]['nan']:
output_str = 'UNKNOWN: '
exit_code = nagios.UNKNOWN
perf_data = []
for metric, data in check_data.items():
output_str = output_str + f"{metric} {data['value_str']}, "
if not check_data[metric]['nan']:
if metrics_levels[metric]['min'] is None:
perf_data.append(f"'{metric}'={data['value']};{metrics_levels[metric]['warn']};{metrics_levels[metric]['crit']};;")
else:
perf_data.append(f"'{metric}'={data['value']};{metrics_levels[metric]['warn']};{metrics_levels[metric]['crit']};{metrics_levels[metric]['min']};")
print(output_str.strip(', ').strip(), end=('\n' if args.table else ''))
if len(perf_data):
perf_data_str = f'| {" ".join(perf_data)}'
else:
perf_data_str = ''
if args.table:
output_table = [('Metric', 'Value', 'Status')]
for metric, data in check_data.items():
output_table.append((metric, data['value_str'], data['status_str']))
print(list_to_markdown_table(output_table, align='left', seperator='!', borders=False))
# else:
# perf_data_str = ' ' + perf_data_str
print(perf_data_str)
sys.exit(exit_code)
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f'UNKNOWN: exception "{e}"')
print(traceback.format_exc())
sys.exit(nagios.UNKNOWN)