#!/usr/bin/env python3 import argparse import json import os import sys import traceback from pathlib import Path import certifi import numpy as np import requests import checker.nagios as nagios from checker.markdown import list_to_markdown_table from checker.units import filesize parser = argparse.ArgumentParser(description='Check the Proxmox API for network traffic for a host.') parser.add_argument('--node', required=True, help='The name and address of Proxmox node in valid JSON in this format: ["bigserver", "192.168.1.222"]. This allows you to use datalists in Director.') parser.add_argument('--user', required=True, help='The Proxmox user. Something like "monitoring@pve!icinga2"') parser.add_argument('--password', required=True, help='Password.') parser.add_argument('--host', required=True, help='The ID of the host to check.') parser.add_argument('--type', required=True, choices=['qemu', 'lxc'], help='Type of host. "qemu" or "lxc"') parser.add_argument('--metrics', required=True, help='What stats to check. Can list multiple seperated by commas. For example, "netin,netout"') parser.add_argument('--levels', required=True, help='Warning levels. In JSON format: {"netin":{"warn":50, "crit":100, "type": "filesize"}, "netout":{"warn":50, "crit":100, "type": "filesize"}}') parser.add_argument('--timeframe', default=5, help='Timeframe to average the data to in minutes. Default: 5 minutes') parser.add_argument('--verify', default=True, help="What to verify the SSL connection with. Can be a file path, or false to disable verification. If you're having issues with CA certs, try setting it to your system's CA bundle (/etc/ssl/certs/ca-certificates.crt).") parser.add_argument('--verify-force', action='store_true', help="Delete the certifi cert and replace it with whatever you specify in --verify") parser.add_argument('--table', action='store_true', help='Print the results in a table.') args = parser.parse_args() # def where(): # return args.verify def main(): if args.verify_force: if not args.verify: print('UNKNOWN: must supply --verify when using --verify-force') sys.exit(nagios.UNKNOWN) if Path(certifi.where()).exists(): os.remove(certifi.where()) os.symlink(args.verify, certifi.where()) print(f'Pointed {certifi.where()} to {args.verify}') if Path(requests.certs.where()).exists(): os.remove(requests.certs.where()) os.symlink(args.verify, requests.certs.where()) print(f'Pointed {requests.certs.where()} to {args.verify}') try: metrics_levels = json.loads(args.levels) except Exception as e: print('UNKNOWN: Failed to parse --levels JSON:', e) sys.exit(nagios.UNKNOWN) try: args.node = json.loads(args.node) pve_node = args.node[0] pve_node_address = args.node[1] except Exception as e: print('UNKNOWN: Failed to parse --node JSON:', e) sys.exit(nagios.UNKNOWN) # requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) try: pve_auth_ticket = requests.post(f'https://{pve_node_address}:8006/api2/json/access/ticket', data={"username": args.user, "password": args.password}).json()['data']['ticket'] response = requests.get(f'https://{pve_node_address}:8006/api2/json/nodes/{pve_node}/{args.type}/{args.host}/rrddata?timeframe=hour', # headers={"Authorization": f'PVEAPIToken={args.user}={args.token}'}, cookies={'PVEAuthCookie': pve_auth_ticket}, verify=args.verify) except requests.exceptions.SSLError as e: print('UNSKNOWN: SSL error ', e) print('Using cert:', args.verify) print('certifi using cert:', certifi.where()) print('requests using cert:', requests.certs.where()) sys.exit(nagios.UNKNOWN) try: api_data = json.loads(response.text)['data'] except Exception as e: print(f'UNKNOWN: Failed to parse JSON {e}') print(response.text) sys.exit(nagios.UNKNOWN) # Load the data metrics_data = {} for item in args.metrics.split(','): if item not in metrics_levels.keys(): print(f'UNKNOWN: missing metric "{item}" in --levels') sys.exit(nagios.UNKNOWN) if 'warn' not in metrics_levels[item].keys(): print(f'UNKNOWN: missing key "warn" for metric "{item}" in --levels') sys.exit(nagios.UNKNOWN) if 'crit' not in metrics_levels[item].keys(): print(f'UNKNOWN: missing key "crit" for metric "{item}" in --levels') sys.exit(nagios.UNKNOWN) if 'type' not in metrics_levels[item].keys(): print(f'UNKNOWN: missing key "type" for metric "{item}" in --levels') sys.exit(nagios.UNKNOWN) metrics_data[item] = [] for m in api_data: for k, v in m.items(): if k == item: if isinstance(v, float): v = np.round(v, 2) metrics_data[item].append(v) check_data = {} exit_code = nagios.OK for metric, value in metrics_data.items(): check_data[metric] = {} # Average the data. Expects the interval to be 1 minute avg = np.round(np.average(value[-5:-1]), 2) check_data[metric]['value'] = avg if metrics_levels[metric]['type'] == 'filesize': check_data[metric]['value_str'] = filesize(avg) check_data[metric]['value'] = f'{int(avg)}B' else: check_data[metric]['value_str'] = str(avg) if avg >= metrics_levels[metric]['crit']: check_data[metric]['status'] = nagios.CRITICAL check_data[metric]['status_str'] = '[CRITICAL]' elif avg >= metrics_levels[metric]['warn']: check_data[metric]['status'] = nagios.WARN check_data[metric]['status_str'] = '[WARNING]' else: check_data[metric]['status'] = nagios.OK check_data[metric]['status_str'] = '[OK]' if exit_code < check_data[metric]['status']: exit_code = check_data[metric]['status'] if exit_code == nagios.OK: output_str = 'OK: ' elif exit_code == nagios.WARNING: output_str = 'WARNING: ' elif exit_code == nagios.CRITICAL: output_str = 'CRITICAL: ' perf_data = [] for metric, data in check_data.items(): output_str = output_str + f"{metric} {data['value_str']}, " perf_data.append(f"'{metric}'={data['value']};{metrics_levels[metric]['warn']};{metrics_levels[metric]['crit']};;") print(output_str.strip(', ').strip(), end=('\n' if args.table else '')) perf_data_str = f'| {" ".join(perf_data)}' if args.table: output_table = [('Metric', 'Value', 'Status')] for metric, data in check_data.items(): output_table.append((metric, data['value_str'], data['status_str'])) print(list_to_markdown_table(output_table, align='left', seperator='!', borders=False)) # else: # perf_data_str = ' ' + perf_data_str print(perf_data_str) sys.exit(exit_code) if __name__ == "__main__": try: main() except Exception as e: print(f'UNKNOWN: exception "{e}"') print(traceback.format_exc()) sys.exit(nagios.UNKNOWN)