icinga2-checks/check_pve_guest_metrics.py

192 lines
7.8 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import json
import os
import sys
import traceback
from pathlib import Path
import certifi
import numpy as np
import requests
import checker.nagios as nagios
from checker.markdown import list_to_markdown_table
from checker.units import filesize
parser = argparse.ArgumentParser(description='Check the Proxmox API for network traffic for a host.')
parser.add_argument('--node', required=True, help='The name and address of Proxmox node in valid JSON in this format: ["bigserver", "192.168.1.222"]. This allows you to use datalists in Director.')
parser.add_argument('--user', required=True, help='The Proxmox user. Something like "monitoring@pve!icinga2"')
parser.add_argument('--password', required=True, help='Password.')
parser.add_argument('--host', required=True, help='The ID of the host to check.')
parser.add_argument('--type', required=True, choices=['qemu', 'lxc'], help='Type of host. "qemu" or "lxc"')
parser.add_argument('--metrics', required=True, help='What stats to check. Can list multiple seperated by commas. For example, "netin,netout"')
parser.add_argument('--levels', required=True, help='Warning levels. In JSON format: {"netin":{"warn":50, "crit":100, "type": "filesize"}, "netout":{"warn":50, "crit":100, "type": "filesize"}}')
parser.add_argument('--timeframe', default=5, help='Timeframe to average the data to in minutes. Default: 5 minutes')
parser.add_argument('--verify', default=True, help="What to verify the SSL connection with. Can be a file path, or false to disable verification. If you're having issues with CA certs, try setting it to your system's CA bundle (/etc/ssl/certs/ca-certificates.crt).")
parser.add_argument('--verify-force', action='store_true', help="Delete the certifi cert and replace it with whatever you specify in --verify")
parser.add_argument('--table', action='store_true', help='Print the results in a table.')
args = parser.parse_args()
# def where():
# return args.verify
def main():
if args.verify_force:
if not args.verify:
print('UNKNOWN: must supply --verify when using --verify-force')
sys.exit(nagios.UNKNOWN)
if Path(certifi.where()).exists():
os.remove(certifi.where())
os.symlink(args.verify, certifi.where())
print(f'Pointed {certifi.where()} to {args.verify}')
if Path(requests.certs.where()).exists():
os.remove(requests.certs.where())
os.symlink(args.verify, requests.certs.where())
print(f'Pointed {requests.certs.where()} to {args.verify}')
try:
metrics_levels = json.loads(args.levels)
except Exception as e:
print('UNKNOWN: Failed to parse --levels JSON:', e)
sys.exit(nagios.UNKNOWN)
try:
args.node = json.loads(args.node)
pve_node = args.node[0]
pve_node_address = args.node[1]
except Exception as e:
print('UNKNOWN: Failed to parse --node JSON:', e)
sys.exit(nagios.UNKNOWN)
# requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
try:
pve_auth_ticket = requests.post(f'https://{pve_node_address}:8006/api2/json/access/ticket', data={"username": args.user, "password": args.password}).json()['data']['ticket']
response = requests.get(
f'https://{pve_node_address}:8006/api2/json/nodes/{pve_node}/{args.type}/{args.host}/rrddata?timeframe=hour',
# headers={"Authorization": f'PVEAPIToken={args.user}={args.token}'},
cookies={'PVEAuthCookie': pve_auth_ticket},
verify=args.verify
)
except requests.exceptions.SSLError as e:
print('UNSKNOWN: SSL error ', e)
print('Using cert:', args.verify)
print('certifi using cert:', certifi.where())
print('requests using cert:', requests.certs.where())
sys.exit(nagios.UNKNOWN)
try:
api_data = json.loads(response.text)['data']
except Exception as e:
print(f'UNKNOWN: Failed to parse JSON {e}')
print(response.text)
sys.exit(nagios.UNKNOWN)
# Load the data
metrics_data = {}
for item in args.metrics.split(','):
if item not in metrics_levels.keys():
print(f'UNKNOWN: missing metric "{item}" in --levels')
sys.exit(nagios.UNKNOWN)
if 'warn' not in metrics_levels[item].keys():
print(f'UNKNOWN: missing key "warn" for metric "{item}" in --levels')
sys.exit(nagios.UNKNOWN)
if 'crit' not in metrics_levels[item].keys():
print(f'UNKNOWN: missing key "crit" for metric "{item}" in --levels')
sys.exit(nagios.UNKNOWN)
if 'type' not in metrics_levels[item].keys():
print(f'UNKNOWN: missing key "type" for metric "{item}" in --levels')
sys.exit(nagios.UNKNOWN)
metrics_data[item] = []
for m in api_data:
for k, v in m.items():
if k == item:
if isinstance(v, float):
v = np.round(v, 2)
metrics_data[item].append(v)
check_data = {}
exit_code = nagios.OK
for metric, value in metrics_data.items():
check_data[metric] = {}
# Average the data. Expects the interval to be 1 minute
if len(value) > 0:
avg = np.round(np.average(value[-5:-1]), 2) # TODO: why [-5:-1]
check_data[metric]['nan'] = False
else:
# Prevent NaN errors
check_data[metric]['nan'] = True
check_data[metric]['value_str'] = 'NaN'
continue
check_data[metric]['value'] = avg
if metrics_levels[metric]['type'] == 'filesize':
check_data[metric]['value_str'] = filesize(avg)
check_data[metric]['value'] = f'{int(avg)}B'
else:
check_data[metric]['value_str'] = str(avg)
if avg >= metrics_levels[metric]['crit']:
check_data[metric]['status'] = nagios.CRITICAL
check_data[metric]['status_str'] = '[CRITICAL]'
elif avg >= metrics_levels[metric]['warn']:
check_data[metric]['status'] = nagios.WARNING
check_data[metric]['status_str'] = '[WARNING]'
else:
check_data[metric]['status'] = nagios.OK
check_data[metric]['status_str'] = '[OK]'
if exit_code < check_data[metric]['status']:
exit_code = check_data[metric]['status']
if exit_code == nagios.OK:
output_str = 'OK: '
elif exit_code == nagios.WARNING:
output_str = 'WARNING: '
elif exit_code == nagios.CRITICAL:
output_str = 'CRITICAL: '
else:
output_str = 'UNKNOWN: '
# Check for NaNs
for metric, data in check_data.items():
if check_data[metric]['nan']:
output_str = 'UNKNOWN: '
exit_code = nagios.UNKNOWN
perf_data = []
for metric, data in check_data.items():
output_str = output_str + f"{metric} {data['value_str']}, "
if not check_data[metric]['nan']:
perf_data.append(f"'{metric}'={data['value']};{metrics_levels[metric]['warn']};{metrics_levels[metric]['crit']};;")
print(output_str.strip(', ').strip(), end=('\n' if args.table else ''))
if len(perf_data):
perf_data_str = f'| {" ".join(perf_data)}'
else:
perf_data_str = ''
if args.table:
output_table = [('Metric', 'Value', 'Status')]
for metric, data in check_data.items():
output_table.append((metric, data['value_str'], data['status_str']))
print(list_to_markdown_table(output_table, align='left', seperator='!', borders=False))
# else:
# perf_data_str = ' ' + perf_data_str
print(perf_data_str)
sys.exit(exit_code)
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f'UNKNOWN: exception "{e}"')
print(traceback.format_exc())
sys.exit(nagios.UNKNOWN)