#!/usr/bin/env python3 import argparse import json import os import sys import traceback from pathlib import Path import certifi import numpy as np import requests from proxmoxer import ProxmoxAPI, ResourceException import checker.nagios as nagios from checker.markdown import list_to_markdown_table from checker.result import quit_check from checker.units import filesize parser = argparse.ArgumentParser(description='Check the Proxmox API for network traffic for a host.') parser.add_argument('--node', required=True, help='The name and address of Proxmox node in valid JSON in this format: ["bigserver", "192.168.1.222"]. This allows you to use datalists in Director.') parser.add_argument('--user', required=True, help='The Proxmox user. Something like "monitoring@pve"') parser.add_argument('--password', required=True, help='API password.') parser.add_argument('--host', required=True, help='The ID of the host to check.') parser.add_argument('--type', required=True, choices=['qemu', 'lxc'], help='Type of host. "qemu" or "lxc"') parser.add_argument('--metrics', required=True, help='What stats to check. Can list multiple seperated by commas. For example, "netin,netout"') parser.add_argument('--levels', required=True, help='Warning levels. In JSON format: {"netin":{"warn":50, "crit":100, "type": "filesize"}, "netout":{"warn":50, "crit":100, "type": "filesize"}}') parser.add_argument('--timeframe', default=5, help='Timeframe to average the data to in minutes. Default: 5 minutes') parser.add_argument('--verify', default=False, help="What to verify the SSL connection with. Can be a file path, or false to disable verification. If you're having issues with CA certs, try setting it to your system's CA bundle (/etc/ssl/certs/ca-certificates.crt). Default: false (verification disabled)") parser.add_argument('--verify-force', action='store_true', help="Delete the certifi cert and replace it with whatever you specify in --verify") parser.add_argument('--table', action='store_true', help='Print the results in a table.') args = parser.parse_args() # def where(): # return args.verify def main(): if args.verify_force: if not args.verify: print('UNKNOWN: must supply --verify when using --verify-force') sys.exit(nagios.UNKNOWN) if Path(certifi.where()).exists(): os.remove(certifi.where()) os.symlink(args.verify, certifi.where()) print(f'Pointed {certifi.where()} to {args.verify}') if Path(requests.certs.where()).exists(): os.remove(requests.certs.where()) os.symlink(args.verify, requests.certs.where()) print(f'Pointed {requests.certs.where()} to {args.verify}') try: metrics_levels = json.loads(args.levels) except Exception as e: print('UNKNOWN: Failed to parse --levels JSON:', e) sys.exit(nagios.UNKNOWN) for k, v in metrics_levels.items(): metrics_levels[k]['min'] = metrics_levels[k].get('min') if isinstance(metrics_levels[k]['min'], float): metrics_levels[k]['min'] = int(metrics_levels[k]['min']) if isinstance(metrics_levels[k]['warn'], float): metrics_levels[k]['warn'] = int(metrics_levels[k]['warn']) if isinstance(metrics_levels[k]['crit'], float): metrics_levels[k]['crit'] = int(metrics_levels[k]['crit']) try: args.node = json.loads(args.node) pve_node = args.node[0] pve_node_address = args.node[1] except Exception as e: print('UNKNOWN: Failed to parse --node JSON:', e) sys.exit(nagios.UNKNOWN) prox = ProxmoxAPI(pve_node_address, user=args.user, password=args.password, verify_ssl=args.verify) try: user_perms = prox('access/permissions').get() except Exception as e: user_perms = f'{e.__class__.__name__}: {e}' try: t = prox.nodes(pve_node).status.get() # test connection and permissions if not len(t): print('UNKNOWN: PVE API returned no nodes.') sys.exit(nagios.UNKNOWN) except requests.exceptions.SSLError as e: print('UNKNOWN: SSL error ', e) print('Using cert:', args.verify) print('certifi using cert:', certifi.where()) print('requests using cert:', requests.certs.where()) sys.exit(nagios.UNKNOWN) except ResourceException as e: print('UNKNOWN:', e) print(f'Proxmox reported "{args.user}" permissions as:', user_perms) sys.exit(nagios.UNKNOWN) except Exception as e: print('UNKNOWN: failed to connect to Proxmox API:', e) try: api_data = prox(f'nodes/{pve_node}/{args.type}/{args.host}/rrddata?timeframe=hour').get() except Exception as e: print(f'UNKNOWN: Failed to fetch API data - ', f'{e.__class__.__name__}: {e}') sys.exit(nagios.UNKNOWN) # Load the data metrics_data = {} for item in args.metrics.split(','): if item not in metrics_levels.keys(): print(f'UNKNOWN: missing metric "{item}" in --levels') sys.exit(nagios.UNKNOWN) if 'warn' not in metrics_levels[item].keys(): print(f'UNKNOWN: missing key "warn" for metric "{item}" in --levels') sys.exit(nagios.UNKNOWN) if 'crit' not in metrics_levels[item].keys(): print(f'UNKNOWN: missing key "crit" for metric "{item}" in --levels') sys.exit(nagios.UNKNOWN) if 'type' not in metrics_levels[item].keys(): print(f'UNKNOWN: missing key "type" for metric "{item}" in --levels') sys.exit(nagios.UNKNOWN) metrics_data[item] = [] for m in api_data: for k, v in m.items(): if k == item: if isinstance(v, float): v = np.round(v, 2) metrics_data[item].append(v) check_data = {} exit_code = nagios.OK for metric, value in metrics_data.items(): check_data[metric] = {} # Average the data. Expects the interval to be 1 minute if len(value) > 0: avg = np.round(np.average(value[-5:-1]), 2) # TODO: why [-5:-1] check_data[metric]['nan'] = False else: # Prevent NaN errors check_data[metric]['nan'] = True check_data[metric]['value_str'] = 'NaN' continue check_data[metric]['value'] = avg if not avg: quit_check('no data', nagios.STATE_UNKNOWN) if metrics_levels[metric]['type'] == 'filesize': check_data[metric]['value_str'] = filesize(avg) check_data[metric]['value'] = f'{int(avg)}B' else: check_data[metric]['value_str'] = str(avg) if avg >= metrics_levels[metric]['crit']: check_data[metric]['status'] = nagios.CRITICAL check_data[metric]['status_str'] = '[CRITICAL]' elif avg >= metrics_levels[metric]['warn']: check_data[metric]['status'] = nagios.WARNING check_data[metric]['status_str'] = '[WARNING]' else: check_data[metric]['status'] = nagios.OK check_data[metric]['status_str'] = '[OK]' if exit_code < check_data[metric]['status']: exit_code = check_data[metric]['status'] if exit_code == nagios.OK: output_str = 'OK: ' elif exit_code == nagios.WARNING: output_str = 'WARNING: ' elif exit_code == nagios.CRITICAL: output_str = 'CRITICAL: ' else: output_str = 'UNKNOWN: ' # Check for NaNs for metric, data in check_data.items(): if check_data[metric]['nan']: output_str = 'UNKNOWN: ' exit_code = nagios.UNKNOWN perf_data = [] for metric, data in check_data.items(): output_str = output_str + f"{metric} {data['value_str']}, " if not check_data[metric]['nan']: if metrics_levels[metric]['min'] is None: perf_data.append(f"'{metric}'={data['value']};{metrics_levels[metric]['warn']};{metrics_levels[metric]['crit']};;") else: perf_data.append(f"'{metric}'={data['value']};{metrics_levels[metric]['warn']};{metrics_levels[metric]['crit']};{metrics_levels[metric]['min']};") print(output_str.strip(', ').strip(), end=('\n' if args.table else '')) if len(perf_data): perf_data_str = f'| {" ".join(perf_data)}' else: perf_data_str = '' if args.table: output_table = [('Metric', 'Value', 'Status')] for metric, data in check_data.items(): output_table.append((metric, data['value_str'], data['status_str'])) print(list_to_markdown_table(output_table, align='left', seperator='!', borders=False)) # else: # perf_data_str = ' ' + perf_data_str print(perf_data_str) sys.exit(exit_code) if __name__ == "__main__": try: main() except Exception as e: print(f'UNKNOWN: exception "{e}"') print(traceback.format_exc()) sys.exit(nagios.UNKNOWN)