diff --git a/Other/check_curl old b/Other/check_curl old index e1739ae..e0c725f 100644 --- a/Other/check_curl old +++ b/Other/check_curl old @@ -141,11 +141,11 @@ if [ -z "$url" ] || [ $# -eq 0 ]; then fi proxyarg="" -if [ ! -z $proxy ] ; then +if [ -n $proxy ] ; then proxyarg=" -x "$proxy" " fi headerarg="" -if [ ! -z "$header" ] ; then +if [ -n "$header" ] ; then headerarg=' -H "'$header'" ' fi followarg="" @@ -162,7 +162,7 @@ if [ $cookies -eq 1 ] ; then cookiesarg=" -c ${COOKIE_JAR_TEMP_PATH} -b ${COOKIE_JAR_TEMP_PATH}" fi bodyarg="" -if [ ! -z $body ]; then +if [ -n $body ]; then body=$(echo $body| sed "s/\"/\\\\\"/g") bodyarg=" --data \""$body"\"" if [ $encodeurl -eq 1 ]; then diff --git a/check_bandwidth.py b/check_bandwidth.py index cba0e32..5dee941 100755 --- a/check_bandwidth.py +++ b/check_bandwidth.py @@ -8,14 +8,15 @@ import traceback import psutil import checker.nagios as nagios -from checker.markdown import list_to_markdown_table +from checker.linuxfabric.base import get_state +from checker import list_to_markdown_table, dict_to_perfdata, print_icinga2_check_status parser = argparse.ArgumentParser(description='Check network interface bandwidth utilization.') -parser.add_argument('--bandwidth', type=float, required=True, help='Bandwidth speed in Mbps. Used to calculate percentage.') +parser.add_argument('--bandwidth', type=float, default=0, help='Bandwidth speed in Mbps. Used to calculate percentage. Default is 0 which disables warning and critical levels.') parser.add_argument('--critical', type=int, default=75, help='Critical if percent of bandwidth usage is greater than or equal to this.') parser.add_argument('--warn', type=int, default=50, help='Warning if percent of bandwidth usage is greater than or equal to this.') parser.add_argument('--max', type=int, default=None, help='Set the max value the bandwidth can be. Useful for graphs and whatever.') -parser.add_argument('--ignore', nargs='*', default=[], help='Interface names to ignore, separated by a space.') +parser.add_argument('--ignore', nargs='*', default=['lo'], help='Interface names to ignore, separated by a space. Default: lo') parser.add_argument('--ignore-re', default=None, help='Regex matching interface names to ignore.') args = parser.parse_args() @@ -71,8 +72,8 @@ def calculate_network_traffic(interface, interval=1): def main(): data = [] - warn_value = (args.bandwidth * args.warn / 100) - crit_value = (args.bandwidth * args.critical / 100) + warn_value = (args.bandwidth * args.warn / 100) if args.bandwidth else 0 + crit_value = (args.bandwidth * args.critical / 100) if args.bandwidth else 0 # Get network interface statistics net_io_counters = psutil.net_io_counters(pernic=True) @@ -89,46 +90,51 @@ def main(): critical = [] warn = [] ok = [] - perf_data = [] + perfdata = {} + for i in range(len(data)): interface = data[i][0] bandwidth_utilization = data[i][3] - if bandwidth_utilization >= crit_value: + state_code = get_state(bandwidth_utilization, warn_value, crit_value, 'lt') + + if state_code == nagios.STATE_CRIT: critical.append(interface) state = 'critical' - exit_code = nagios.CRITICAL - elif bandwidth_utilization >= warn_value: + exit_code = max(exit_code, nagios.CRITICAL) + elif state_code == nagios.STATE_WARN: warn.append(interface) state = 'warning' - if exit_code < nagios.WARNING: - exit_code = nagios.WARNING + exit_code = max(exit_code, nagios.WARNING) else: ok.append(interface) state = 'ok' - data[i][4] = f'[{state.upper()}]' - perf_data.append(f'{interface}={round(bandwidth_utilization, 2)}Mbps;{warn_value};{crit_value};{f"0;{args.max};" if args.max else ""} ') - # Print the status + data[i][4] = f'[{state.upper()}]' + perfdata.update({ + interface: { + 'value': round(bandwidth_utilization, 2), + 'warn': warn_value, + 'crit': crit_value, + 'min': 0 if args.max else None, + 'unit': 'Mbps' + } + }) + if exit_code == nagios.CRITICAL: - status = 'CRITICAL' listed_interfaces = [*critical, *warn] elif exit_code == nagios.WARNING: - status = 'WARNING' listed_interfaces = warn else: - status = 'OK' listed_interfaces = ok listed_glances = [] for interface in listed_interfaces: - listed_glances.append(f'{interface}: {round(get_interface_data(interface, data)[3], 2)}Mbps') - print(f'{status} - {", ".join(listed_glances)}') + listed_glances.append(f'{interface}: {round(get_interface_data(interface, data)[3], 2)} Mbps') data = [(x[0], f'{round(x[3], 2)} Mbps', x[4]) for x in data] data.insert(0, ('Interface', 'Bandwidth', 'State')) - print(list_to_markdown_table(data, align='left', seperator='!', borders=False)) - print(f'|{"".join(perf_data)}') + print_icinga2_check_status(f'{", ".join(listed_glances)}\n{list_to_markdown_table(data, align="left", seperator="!", borders=False)}', exit_code, perfdata) sys.exit(exit_code) diff --git a/check_chatgpt_bot.py b/check_chatgpt_bot.py deleted file mode 100755 index 9256d56..0000000 --- a/check_chatgpt_bot.py +++ /dev/null @@ -1,226 +0,0 @@ -#!/usr/bin/env python3 -import argparse -import asyncio -import json -import os -import sys -import tempfile -import traceback -import urllib - -import numpy as np -import requests -from PIL import Image -from nio import AsyncClient, AsyncClientConfig, LoginResponse, RoomSendError -from urllib3.exceptions import InsecureRequestWarning - -from checker import nagios -from checker.synapse_client import send_image, write_login_details_to_disk - -parser = argparse.ArgumentParser(description='') -parser.add_argument('--user', required=True, help='User ID for the bot.') -parser.add_argument('--pw', required=True, help='Password for the bot.') -parser.add_argument('--hs', required=True, help='Homeserver of the bot.') -parser.add_argument('--admin-endpoint', required=True, help='Admin endpoint that will be called to purge media for this user.') -parser.add_argument('--room', required=True, help='The room the bot should send its test messages in.') -parser.add_argument('--check-domain', required=True, help='The domain that should be present.') -parser.add_argument('--media-cdn-redirect', default='true', help='If set, the server must respond with a redirect to the media CDN domain.') -parser.add_argument('--required-headers', nargs='*', help="If these headers aren't set to the correct value, critical. Use the format 'key=value") -parser.add_argument('--auth-file', help="File to cache the bot's login details to.") -parser.add_argument('--timeout', type=float, default=90, help='Request timeout limit.') -parser.add_argument('--warn', type=float, default=2.0, help='Manually set warn level.') -parser.add_argument('--crit', type=float, default=2.5, help='Manually set critical level.') -args = parser.parse_args() - -if args.media_cdn_redirect == 'true': - args.media_cdn_redirect = True -elif args.media_cdn_redirect == 'false': - args.media_cdn_redirect = False -else: - print('UNKNOWN: could not parse the value for --media-cdn-redirect') - sys.exit(nagios.UNKNOWN) - - -def verify_media_header(header: str, header_dict: dict, good_value: str = None, warn_value: str = None, critical_value: str = None): - """ - If you don't specify good_value, warn_value, or critical_value then the header will only be checked for existience. - """ - - # Convert everything to lowercase strings to prevent any wierdness - header_dict = {k.lower(): v for k, v in header_dict.items()} - header = header.lower() - header_value = str(header_dict.get(header)) - warn_value = str(warn_value) - critical_value = str(critical_value) - if not header_value: - return f'CRITICAL: missing header\n"{header}"', nagios.CRITICAL - - if good_value: - good_value = str(good_value) - if header_value == good_value: - return f'OK: {header}: "{header_value}"', nagios.OK - else: - return f'CRITICAL: {header} is not "{good_value}", is "{header_value}"', nagios.CRITICAL - # elif warn_value and header_value == warn_value: - # return f'WARN: {header}: "{header_value}"', nagios.WARNING - # elif critical_value and header_value == critical_value: - # return f'CRITICAL: {header}: "{header_value}"', nagios.CRITICAL - return f'OK: {header} is present', nagios.OK # with value "{header_value}"' - - -async def main() -> None: - exit_code = nagios.OK - - async def cleanup(client, test_image_path, image_event_id=None): - nonlocal exit_code - # Clean up - if image_event_id: - await client.room_redact(args.room, image_event_id) - os.remove(test_image_path) - await client.close() - - requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) - try: - r = requests.delete(f'{args.admin_endpoint}/_synapse/admin/v1/users/{args.user}/media', headers={'Authorization': f'Bearer {client.access_token}'}, verify=False) - if r.status_code != 200: - if nagios.WARNING < exit_code: - exit_code = nagios.WARNING - return f"WARN: failed to purge media for this user.\n{r.text}" - else: - return None - except Exception as e: - if nagios.WARNING < exit_code: - exit_code = nagios.WARNING - return f"WARN: failed to purge media for this user.\n{e}" - - client = AsyncClient(args.hs, args.user, config=AsyncClientConfig(request_timeout=args.timeout, max_timeout_retry_wait_time=10)) - if args.auth_file: - # If there are no previously-saved credentials, we'll use the password - if not os.path.exists(args.auth_file): - resp = await client.login(args.pw) - - # check that we logged in successfully - if isinstance(resp, LoginResponse): - write_login_details_to_disk(resp, args.hs, args.auth_file) - else: - print(f'CRITICAL: failed to log in.\n{resp}') - sys.exit(nagios.CRITICAL) - else: - # Otherwise the config file exists, so we'll use the stored credentials - with open(args.auth_file, "r") as f: - config = json.load(f) - client = AsyncClient(config["homeserver"]) - client.access_token = config["access_token"] - client.user_id = config["user_id"] - client.device_id = config["device_id"] - else: - await client.login(args.pw) - - await client.join(args.room) - - # Create a random image - imarray = np.random.rand(100, 100, 3) * 255 - im = Image.fromarray(imarray.astype('uint8')).convert('RGBA') - _, test_image_path = tempfile.mkstemp() - test_image_path = test_image_path + '.png' - im.save(test_image_path) - - # Send the image and get the event ID - image_event_id = (await send_image(client, args.room, test_image_path)) - if isinstance(image_event_id, RoomSendError): - await cleanup(client, test_image_path) - print(f'CRITICAL: failed to send message.\n{image_event_id}') - sys.exit(nagios.CRITICAL) - image_event_id = image_event_id.event_id - - # Get the event - image_event = (await client.room_get_event(args.room, image_event_id)).event - - # convert mxc:// to http:// - target_file_url = await client.mxc_to_http(image_event.url) - - # Check the file. Ignore the non-async thing here, it doesn't matter in this situation. - # Remember: Cloudflare does not cache non-GET requests. - r = requests.head(target_file_url, allow_redirects=False) - - prints = [] - - if r.status_code != 200 and not args.media_cdn_redirect: - await cleanup(client, test_image_path, image_event_id=image_event_id) - prints.append(f'CRITICAL: status code is "{r.status_code}"') - sys.exit(nagios.CRITICAL) - else: - prints.append(f'OK: status code is "{r.status_code}"') - - headers = dict(r.headers) - - # Check domain - if args.media_cdn_redirect: - if 'location' in headers: - domain = urllib.parse.urlparse(headers['location']).netloc - if domain != args.check_domain: - exit_code = nagios.CRITICAL - prints.append(f'CRITICAL: redirect to media CDN domain is "{domain}"') - else: - prints.append(f'OK: media CDN domain is "{domain}"') - else: - exit_code = nagios.CRITICAL - prints.append(f'CRITICAL: was not redirected to the media CDN domain.') - - # Make sure we aren't redirected if we're a Synapse server - test = requests.head(target_file_url, headers={'User-Agent': 'Synapse/1.77.3'}, allow_redirects=False) - if test.status_code != 200: - prints.append('CRITICAL: Synapse user-agent is redirected with status code', test.status_code) - exit_code = nagios.CRITICAL - else: - prints.append(f'OK: Synapse user-agent is not redirected.') - else: - if 'location' in headers: - exit_code = nagios.CRITICAL - prints.append(f"CRITICAL: recieved 301 to {urllib.parse.urlparse(headers['location']).netloc}") - else: - prints.append(f'OK: was not redirected.') - - if args.required_headers: - # Icinga may pass the values as one string - if len(args.required_headers) == 1: - args.required_headers = args.required_headers[0].split(' ') - for item in args.required_headers: - key, value = item.split('=') - header_chk, code = verify_media_header(key, headers, good_value=value) - prints.append(header_chk) - if code > exit_code: - exit_code = code - - # results = [verify_media_header('synapse-media-local-status', headers), verify_media_header('synapse-media-s3-status', headers, good_value='200'), verify_media_header('synapse-media-server', headers, good_value='s3')] - # for header_chk, code in results: - # prints.append(header_chk) - # if code > exit_code: - # exit_code = code - - clean_msg = await cleanup(client, test_image_path, image_event_id=image_event_id) - - if exit_code == nagios.OK: - print('OK: media CDN is good.') - elif exit_code == nagios.UNKNOWN: - print('UNKNOWN: media CDN is bad.') - elif exit_code == nagios.WARNING: - print('WARNING: media CDN is bad.') - elif exit_code == nagios.CRITICAL: - print('CRITICAL: media CDN is bad.') - for msg in prints: - print(msg) - - if clean_msg: - print(clean_msg) - - sys.exit(exit_code) - - -if __name__ == "__main__": - try: - asyncio.run(main()) - except Exception as e: - print(f'UNKNOWN: exception\n{e}') - print(traceback.format_exc()) - sys.exit(nagios.UNKNOWN) diff --git a/check_nginx.py b/check_nginx.py index 5c12848..2de95d4 100755 --- a/check_nginx.py +++ b/check_nginx.py @@ -2,20 +2,11 @@ import argparse import sys +import traceback -import requests - -from checker import nagios - - -def check_nginx_status(url): - try: - response = requests.get(url) - response.raise_for_status() - return response.text - except requests.exceptions.RequestException as e: - print("CRITICAL - Unable to connect to Nginx stub_status: {}".format(e)) - sys.exit(2) +from checker import nagios, print_icinga2_check_status +from checker.http import get_with_retry +from checker.linuxfabric.base import get_state def parse_nginx_status(status): @@ -38,49 +29,70 @@ def parse_nginx_status(status): data["waiting"] = int(parts[5]) return data except Exception as e: - print(f'UNKNOWN: failed to parse status page: {e}') + print_icinga2_check_status(f'failed to parse status page: {e}', nagios.UNKNOWN) sys.exit(nagios.UNKNOWN) def main(): parser = argparse.ArgumentParser(description="Check Nginx status using stub_status.") parser.add_argument("--url", required=True, help="URL to Nginx stub_status.") - parser.add_argument("--critical-active", type=int, default=0, help="Critical threshold for active connections. Default: disabled") - parser.add_argument("--warning-active", type=int, default=0, help="Warning threshold for active connections. Default: disabled") - parser.add_argument("--critical-waiting", type=int, default=0, help="Critical threshold for waiting connections. Default: disabled") - parser.add_argument("--warning-waiting", type=int, default=0, help="Warning threshold for waiting connections. Default: disabled") + parser.add_argument("--critical-active", type=int, default=None, help="Critical threshold for active connections. Default: 0 (disabled)") + parser.add_argument("--warning-active", type=int, default=None, help="Warning threshold for active connections. Default: 0 (disabled)") + parser.add_argument("--critical-waiting", type=int, default=None, help="Critical threshold for waiting connections. Default: 0 (disabled)") + parser.add_argument("--warning-waiting", type=int, default=None, help="Warning threshold for waiting connections. Default: 0 (disabled)") args = parser.parse_args() - status = check_nginx_status(args.url) + status = get_with_retry(args.url).text data = parse_nginx_status(status) - status_str = "Active connections: {active_connections}, Waiting: {waiting}, Accepted: {accepted}, Handled: {handled}, Requests: {requests}, Reading: {reading}, Writing: {writing}".format(**data) + perfdata_dict = { + "active_connections": { + "value": data["active_connections"], + "warn": args.warning_active, + "crit": args.critical_active, + }, + "waiting": { + "value": data["waiting"], + "warn": args.warning_waiting, + "crit": args.critical_waiting, + }, + "accepted": {"value": data["accepted"]}, + "handled": {"value": data["handled"]}, + "requests": {"value": data["requests"]}, + "reading": {"value": data["reading"]}, + "writing": {"value": data["writing"]}, + } - if args.warning_active > 0 and args.critical_active > 0 and args.warning_waiting > 0 and args.critical_waiting > 0: - perfdata = "| active_connections={active_connections};{warning};{critical} waiting={waiting};{warning_waiting};{critical_waiting} accepted={accepted} handled={handled} requests={requests} reading={reading} writing={writing}".format( - warning=args.warning_active, - critical=args.critical_active, - warning_waiting=args.warning_waiting, - critical_waiting=args.critical_waiting, - **data + return_code = nagios.STATE_OK + if args.warning_active or args.critical_active: + active_connections_state = get_state( + data["active_connections"], args.warning_active, args.critical_active, "ge" ) - if data["active_connections"] >= args.critical_active or data["waiting"] >= args.critical_waiting: - print("CRITICAL:", status_str, perfdata) - sys.exit(nagios.CRITICAL) - elif data["active_connections"] >= args.warning_active or data["waiting"] >= args.warning_waiting: - print("WARNING:", status_str, perfdata) - sys.exit(nagios.WARNING) - else: - print("OK:", status_str) - sys.exit(nagios.OK) else: - perfdata = "| active_connections={active_connections} waiting={waiting} accepted={accepted} handled={handled} requests={requests} reading={reading} writing={writing}".format( - **data - ) - print("OK:", status_str) - print('Critical and warning levels disabled.', perfdata) - sys.exit(nagios.OK) + active_connections_state = nagios.STATE_OK + return_code = max(active_connections_state, return_code) + + if args.warning_waiting or args.critical_waiting: + waiting_state = get_state(data["waiting"], args.warning_waiting, args.critical_waiting, "ge") + else: + waiting_state = nagios.STATE_OK + return_code = max(waiting_state, return_code) + + # if active_connections_state == nagios.STATE_CRIT or waiting_state == nagios.STATE_CRIT: + # return_code = nagios.CRITICAL + # elif active_connections_state == nagios.STATE_WARN or waiting_state == nagios.STATE_WARN: + # return_code = nagios.WARNING + # else: + # return_code = nagios.OK + + status_str = "Active connections: {active_connections}, Waiting: {waiting}, Accepted: {accepted}, Handled: {handled}, Requests: {requests}, Reading: {reading}, Writing: {writing}".format(**data) + print_icinga2_check_status(status_str, return_code, perfdata_dict) + sys.exit(return_code) if __name__ == "__main__": - main() + try: + main() + except Exception as e: + print_icinga2_check_status(f'exception "{e}" \n {traceback.format_exc()}', nagios.UNKNOWN) + sys.exit(nagios.UNKNOWN) diff --git a/check_opnsense_traffic_for_host.py b/check_opnsense_traffic_for_host.py index 01c3969..d22d960 100755 --- a/check_opnsense_traffic_for_host.py +++ b/check_opnsense_traffic_for_host.py @@ -1,15 +1,17 @@ #!/usr/bin/env python3 import argparse import sys -import time import traceback -from ipaddress import ip_address, ip_network +from ipaddress import ip_network import numpy as np import requests from urllib3.exceptions import InsecureRequestWarning import checker.nagios as nagios +from checker import print_icinga2_check_status +from checker.http import get_with_retry +from checker.linuxfabric.base import get_state from checker.markdown import list_to_markdown_table from checker.units import filesize @@ -71,36 +73,33 @@ def main(): interface_names[interface['name']] = name if not len(interface_names.keys()): - print(f'UNKNOWN: did not find any valid interface names! Double-check the name.') + print_icinga2_check_status('did not find any valid interface names! Double-check the name.', nagios.STATE_UNKNOWN) sys.exit(nagios.UNKNOWN) for name, interface in interface_names.items(): # Fetch the data traffic_data = [] for _ in range(args.duration): - start_time = time.time() - response = requests.get(f'https://{args.opnsense}/api/diagnostics/traffic/top/{interface}', - headers={'Accept': 'application/json'}, auth=(args.key, args.secret), verify=False, - timeout=args.timeout) - end_time = time.time() - api_request_time = end_time - start_time + # start_time = time.time() + response = get_with_retry('https://{args.opnsense}/api/diagnostics/traffic/top/{interface}', + headers={'Accept': 'application/json'}, auth=(args.key, args.secret), verify=False, + timeout=args.timeout) + # end_time = time.time() + # api_request_time = end_time - start_time - if response.status_code != 200: - print(f'UNKNOWN: unable to query OPNsense API for {interface}: {response.status_code}\n{response.text}') - sys.exit(nagios.UNKNOWN) if isinstance(response.json(), list): - print(F'UNKNOWN - OPNsense returned wrong datatype:\n{response.json()}') + print_icinga2_check_status(f'OPNsense returned wrong datatype:\n{response.text}', nagios.STATE_UNKNOWN) + sys.exit(nagios.STATE_UNKNOWN) for item in response.json().get(interface, {}).get('records', False): if item['address'] == args.host: traffic_data.append(item) - adjusted_sleep_duration = max(1 - api_request_time, 0) - time.sleep(adjusted_sleep_duration) + # adjusted_sleep_duration = max(1 - api_request_time, 0) + # time.sleep(adjusted_sleep_duration) if not len(traffic_data) and args.fail_empty: - print('UNKNOWN: Interface or host not found in OPNsense API response. Raw response:') - print(traffic_data) + print_icinga2_check_status(f'interface or host not found in OPNsense API response. Raw response:\n{traffic_data}', nagios.STATE_UNKNOWN) sys.exit(nagios.UNKNOWN) elif not len(traffic_data): # There was no traffic. @@ -121,11 +120,7 @@ def main(): 'connections': int(np.average([len(x['details']) for x in traffic_data])) } except Exception as e: - print(f'UNKNOWN: Failed to parse traffic data: "{e}"') - print(traceback.format_exc()) - print('') - print('Raw data:') - print(traffic_data) + print_icinga2_check_status(f'failed to parse traffic data: {e}\n{traceback.format_exc()}\n{traffic_data}', nagios.STATE_UNKNOWN) sys.exit(nagios.UNKNOWN) warn_b_value = (args.bandwidth * args.bandwidth_warn / 100) * 1e+6 @@ -134,74 +129,53 @@ def main(): exit_code = nagios.OK critical = [] warn = [] - ok = [] perf_data = [] output_table = [ ('Host', 'Interface', 'Rate In', 'Rate Out', 'Cumulative In', 'Cumulative Out', 'Connections', 'Status') ] - def check_b(name, value): + def check_b(name, state, value): nonlocal exit_code - if value >= crit_b_value: + if state == nagios.STATE_CRIT: critical.append((name, filesize(value))) - exit_code = nagios.CRITICAL - return '[CRITICAL]', nagios.CRITICAL - elif value >= warn_b_value: + exit_code = max(nagios.CRITICAL, exit_code) + return '[CRITICAL]', exit_code + elif state == nagios.STATE_WARN: warn.append((name, filesize(value))) - exit_code = nagios.WARNING - return '[WARNING]', nagios.WARNING - else: - ok.append((name, filesize(value))) - return '[OK]', nagios.OK + exit_code = max(nagios.STATE_WARN, exit_code) + return '[WARNING]', exit_code for name, data in check_result.items(): status = '[OK]' - in_status, in_rc = check_b('rate_in', data['rate_in']) - if in_rc >= exit_code: - status = in_status + in_state = get_state(data['rate_in'], warn_b_value, crit_b_value, 'ge') + in_status, exit_code = check_b(name, in_state, data['rate_in']) - out_status, out_rc = check_b('rate_out', data['rate_out']) - if out_rc >= exit_code: - status = out_status + out_state = get_state(data['rate_out'], warn_b_value, crit_b_value, 'ge') + in_status, exit_code = check_b(name, out_state, data['rate_out']) - if data['connections'] >= args.conn_critical > 0: - critical.append(('connections', data['connections'])) - exit_code = nagios.CRITICAL - status = '[CRITICAL]' - elif data['connections'] >= args.conn_warn > 0: - warn.append(('connections', data['connections'])) - exit_code = nagios.WARNING - status = '[WARNING]' - else: - ok.append(('connections', data['connections'])) + conn_state = get_state(data['connections'], args.conn_warn, args.conn_critical, 'ge') + conn_status, exit_code = check_b(name, conn_state, data['connections']) - perf_data.append(f'\'{name}_rate_in\'={int(data["rate_in"])}B;{warn_b_value};{crit_b_value};0;') - perf_data.append(f'\'{name}_rate_out\'={int(data["rate_out"])}B;{warn_b_value};{crit_b_value};0;') - perf_data.append(f'\'{name}_cumulative_in\'={int(data["cumulative_in"])}B;{warn_b_value};{crit_b_value};0;') - perf_data.append(f'\'{name}_cumulative_out\'={int(data["cumulative_out"])}B;{warn_b_value};{crit_b_value};0;') - perf_data.append(f'\'{name}_connections\'={int(data["connections"])}B;{warn_b_value};{crit_b_value};0;') + perf_data[f'{name}_rate_in'] = {'value': int(data["rate_in"]), 'warn': warn_b_value, 'crit': crit_b_value, 'unit': 'B'} + perf_data[f'{name}_rate_out'] = {'value': int(data["rate_out"]), 'warn': warn_b_value, 'crit': crit_b_value, 'unit': 'B'} + perf_data[f'{name}_cumulative_in'] = {'value': int(data["cumulative_in"]), 'warn': warn_b_value, 'crit': crit_b_value, 'unit': 'B'} + perf_data[f'{name}_cumulative_out'] = {'value': int(data["cumulative_out"]), 'warn': warn_b_value, 'crit': crit_b_value, 'unit': 'B'} + perf_data[f'{name}_connections'] = {'value': int(data["connections"]), 'warn': args.conn_warn, 'crit': args.conn_critical, 'unit': 'B'} output_table.append((args.host, name, filesize(data['rate_in']), filesize(data['rate_out']), filesize(data['cumulative_in']), filesize(data['cumulative_out']), data['connections'], status)) - if len(critical): - x = ['CRITICAL: '] - for i in critical: - x.append(f'{i[0]}: {i[1]}, ') - print(''.join(x).strip(', ')) - if len(warn): - x = ['WARN: '] - for i in warn: - x.append(f'{i[0]}: {i[1]}') - print(''.join(x).strip(', ')) - if not len(warn) and not len(critical): - print(f'OK: bandwidth is below {args.bandwidth} Mbps.') + if exit_code == nagios.STATE_OK: + text_result = f'bandwidth is below {args.bandwidth} Mbps.' + else: + text_result = ', '.join([*critical, *warn]) + if len(check_result) > 1: + text_result += list_to_markdown_table(output_table, align='left', seperator='!', borders=False) - print(list_to_markdown_table(output_table, align='left', seperator='!', borders=False)) - print(f'| {" ".join(perf_data)}') + print_icinga2_check_status(text_result, exit_code, perf_data) sys.exit(exit_code) @@ -209,6 +183,5 @@ if __name__ == "__main__": try: main() except Exception as e: - print(f'UNKNOWN: exception "{e}"') - print(traceback.format_exc()) + print_icinga2_check_status(f'exception "{e}"\n{traceback.format_exc()}', nagios.STATE_UNKNOWN) sys.exit(nagios.UNKNOWN) diff --git a/check_speedtest.py b/check_speedtest.py index 335c3d3..f38aab9 100755 --- a/check_speedtest.py +++ b/check_speedtest.py @@ -7,6 +7,8 @@ import warnings from cloudflarepycli import cloudflareclass import checker.nagios as nagios +from checker.linuxfabric.base import get_state +from checker import print_icinga2_check_status def main(): @@ -34,36 +36,43 @@ def main(): warnings.simplefilter("ignore", category=RuntimeWarning) speedtest_results = cloudflareclass.cloudflare(printit=False).runalltests() - out_str = f"upload: {speedtest_results['90th_percentile_upload_speed']['value']} Mbps, download: {speedtest_results['90th_percentile_download_speed']['value']} Mbps, latency: {speedtest_results['latency_ms']['value']} ms, jitter: {speedtest_results['Jitter_ms']['value']} ms" - perf_data = f"'upload'={speedtest_results['90th_percentile_upload_speed']['value'] * 1e+6}B;{args.warn_up * 1e+6};{args.critical_up * 1e+6};0; 'download'={speedtest_results['90th_percentile_download_speed']['value'] * 1e+6}B;{args.warn_down * 1e+6};{args.critical_down * 1e+6};0; 'latency_ms'={speedtest_results['latency_ms']['value']}ms;{args.warn_latency};{args.critical_latency};0; 'jitter_ms'={speedtest_results['Jitter_ms']['value']}ms;;;0;" + upload_speed_state = get_state(speedtest_results['90th_percentile_upload_speed']['value'], args.warn_up, args.critical_up, _operator='le') + download_speed_state = get_state(speedtest_results['90th_percentile_download_speed']['value'], args.warn_down, args.critical_down, _operator='le') + latency_state = get_state(speedtest_results['latency_ms']['value'], args.warn_latency, args.critical_latency, _operator='ge') + exit_code = max(upload_speed_state, download_speed_state, latency_state) + text_result = f"upload: {speedtest_results['90th_percentile_upload_speed']['value']} Mbps, download: {speedtest_results['90th_percentile_download_speed']['value']} Mbps, latency: {speedtest_results['latency_ms']['value']} ms, jitter: {speedtest_results['Jitter_ms']['value']} ms" - exit_code = nagios.OK + perfdata = { + 'upload': { + 'value': speedtest_results['90th_percentile_upload_speed']['value'] * 1e+6, + 'warn': args.warn_up * 1e+6, + 'crit': args.critical_up * 1e+6, + 'min': 0, + 'unit': 'B' + }, + 'download': { + 'value': speedtest_results['90th_percentile_download_speed']['value'] * 1e+6, + 'warn': args.warn_down * 1e+6, + 'crit': args.critical_down * 1e+6, + 'min': 0, + 'unit': 'B' + }, + 'latency_ms': { + 'value': speedtest_results['latency_ms']['value'], + 'warn': args.warn_latency, + 'crit': args.critical_latency, + 'min': 0, + 'unit': 'ms' + }, + 'jitter_ms': { + 'value': speedtest_results['Jitter_ms']['value'], + 'min': 0, + 'unit': 'ms' + } - if speedtest_results['90th_percentile_upload_speed']['value'] <= args.critical_up and exit_code < nagios.CRITICAL: - exit_code = nagios.CRITICAL - elif speedtest_results['90th_percentile_upload_speed']['value'] <= args.warn_up and exit_code < nagios.WARNING: - exit_code = nagios.WARNING + } - if speedtest_results['90th_percentile_download_speed']['value'] <= args.critical_down and exit_code < nagios.CRITICAL: - exit_code = nagios.CRITICAL - elif speedtest_results['90th_percentile_download_speed']['value'] <= args.warn_down and exit_code < nagios.WARNING: - exit_code = nagios.WARNING - - if speedtest_results['latency_ms']['value'] >= args.warn_latency and exit_code < nagios.CRITICAL: - exit_code = nagios.CRITICAL - elif speedtest_results['latency_ms']['value'] >= args.warn_latency and exit_code < nagios.WARNING: - exit_code = nagios.WARNING - - if exit_code == nagios.OK: - status_str = 'OK' - elif exit_code == nagios.WARNING: - status_str = 'WARN' - elif exit_code == nagios.CRITICAL: - status_str = 'CRITICAL' - else: - status_str = 'UNKNOWN' - - print(f'{status_str} - {out_str} |{perf_data}') + print_icinga2_check_status(text_result, exit_code, perfdata) sys.exit(exit_code) diff --git a/checker/__init__.py b/checker/__init__.py index 8b13789..860b257 100644 --- a/checker/__init__.py +++ b/checker/__init__.py @@ -1 +1,2 @@ - +from .print import print_icinga2_check_status, dict_to_perfdata, create_description_list +from .markdown import list_to_markdown_table diff --git a/checker/http.py b/checker/http.py new file mode 100644 index 0000000..be11596 --- /dev/null +++ b/checker/http.py @@ -0,0 +1,30 @@ +import sys +from time import sleep + +import requests + +from . import nagios +from .print import print_icinga2_check_status + + +def get_with_retry(url, retries=3, delay=1, **kwargs): + """ + Wrapper function for requests.get() with a retry mechanism. + + :param url: URL to send the GET request + :param retries: Number of retries in case of HTTP failures (default: 3) + :param delay: Time delay between retries in seconds (default: 1) + :param kwargs: Additional keyword arguments for requests.get() + :return: Response object + """ + for i in range(retries): + try: + response = requests.get(url, **kwargs) + response.raise_for_status() + return response + except requests.exceptions.RequestException as e: + if i == retries - 1: + # raise e + print_icinga2_check_status(f'HTTP request failed after {i} retries: {url}\n{e}', nagios.STATE_UNKNOWN) + sys.exit(nagios.STATE_UNKNOWN) + sleep(delay) diff --git a/checker/linuxfabric/base.py b/checker/linuxfabric/base.py new file mode 100644 index 0000000..d66db23 --- /dev/null +++ b/checker/linuxfabric/base.py @@ -0,0 +1,624 @@ +#! /usr/bin/env python3 +# -*- coding: utf-8; py-indent-offset: 4 -*- +# +# Author: Linuxfabrik GmbH, Zurich, Switzerland +# Contact: info (at) linuxfabrik (dot) ch +# https://www.linuxfabrik.ch/ +# License: The Unlicense, see LICENSE file. + +# https://github.com/Linuxfabrik/monitoring-plugins/blob/main/CONTRIBUTING.rst + +"""Provides very common every-day functions. +""" + +__author__ = 'Linuxfabrik GmbH, Zurich/Switzerland' +__version__ = '2023051201' + +import collections +import numbers +import operator +import os +import sys +from traceback import format_exc # pylint: disable=C0413 + +from ..nagios import STATE_CRIT, STATE_OK, STATE_UNKNOWN, STATE_WARN + +WINDOWS = os.name == "nt" +LINUX = sys.platform.startswith("linux") +X86_64 = sys.maxsize > 2 ** 32 + + +def contine_or_exit(result, state=STATE_UNKNOWN): + """Continue or Exit (CoE) + + This is useful if calling complex library functions in your checks + `main()` function. Don't use this in functions. + + If a more complex library function, for example `lib.url.fetch()` fails, it + returns `(False, 'the reason why I failed')`, otherwise `(True, + 'this is my result'). This forces you to do some error handling. + To keep things simple, use `result = lib.base.coe(lib.url.fetch(...))`. + If `fetch()` fails, your plugin will exit with STATE_UNKNOWN (default) and + print the original error message. Otherwise your script just goes on. + + The use case in `main()` - without `coe`: + + >>> success, html = lib.url.fetch(URL) + >>> if not success: + >>> print(html) # contains the error message here + >>>> exit(STATE_UNKNOWN) + + Or simply: + + >>> html = lib.base.coe(lib.url.fetch(URL)) + + Parameters + ---------- + result : tuple + The result from a function call. + result[0] = expects the function return code (True on success) + result[1] = expects the function result (could be of any type) + state : int + If result[0] is False, exit with this state. + Default: 3 (which is STATE_UNKNOWN) + + Returns + ------- + any type + The result of the inner function call (result[1]). + """ + if result[0]: + # success + return result[1] + print(result[1]) + sys.exit(state) + + +def see_you(): + """See you (cu) + + Prints a Stacktrace (replacing "<" and ">" to be printable in Web-GUIs), and exits with + STATE_UNKNOWN. + """ + print(format_exc().replace("<", "'").replace(">", "'")) + sys.exit(STATE_UNKNOWN) + + +def get_perfdata(label, value, uom=None, warn=None, crit=None, _min=None, _max=None): + """Returns 'label'=value[UOM];[warn];[crit];[min];[max] + """ + msg = "'{}'={}".format(label, value) + if uom is not None: + msg += uom + msg += ';' + if warn is not None: + msg += str(warn) + msg += ';' + if crit is not None: + msg += str(crit) + msg += ';' + if _min is not None: + msg += str(_min) + msg += ';' + if _max is not None: + msg += str(_max) + msg += ' ' + return msg + + +def get_state(value, warn, crit, _operator='ge'): + """Returns the STATE by comparing `value` to the given thresholds using + a comparison `_operator`. `warn` and `crit` threshold may also be `None`. + + >>> get_state(15, 10, 20, 'ge') + 1 (STATE_WARN) + >>> get_state(10, 10, 20, 'gt') + 0 (STATE_OK) + + Parameters + ---------- + value : float + Numeric value + warn : float + Numeric warning threshold + crit : float + Numeric critical threshold + _operator : string + `eq` = equal to + `ge` = greater or equal + `gt` = greater than + `le` = less or equal + `lt` = less than + `ne` = not equal to + `range` = match range + + Returns + ------- + int + `STATE_OK`, `STATE_WARN` or `STATE_CRIT` + """ + # make sure to use float comparison + value = float(value) + if _operator == 'ge': + if crit is not None: + if value >= float(crit): + return STATE_CRIT + if warn is not None: + if value >= float(warn): + return STATE_WARN + return STATE_OK + + if _operator == 'gt': + if crit is not None: + if value > float(crit): + return STATE_CRIT + if warn is not None: + if value > float(warn): + return STATE_WARN + return STATE_OK + + if _operator == 'le': + if crit is not None: + if value <= float(crit): + return STATE_CRIT + if warn is not None: + if value <= float(warn): + return STATE_WARN + return STATE_OK + + if _operator == 'lt': + if crit is not None: + if value < float(crit): + return STATE_CRIT + if warn is not None: + if value < float(warn): + return STATE_WARN + return STATE_OK + + if _operator == 'eq': + if crit is not None: + if value == float(crit): + return STATE_CRIT + if warn is not None: + if value == float(warn): + return STATE_WARN + return STATE_OK + + if _operator == 'ne': + if crit is not None: + if value != float(crit): + return STATE_CRIT + if warn is not None: + if value != float(warn): + return STATE_WARN + return STATE_OK + + if _operator == 'range': + if crit is not None: + if not contine_or_exit(match_range(value, crit)): + return STATE_CRIT + if warn is not None: + if not contine_or_exit(match_range(value, warn)): + return STATE_WARN + return STATE_OK + + return STATE_UNKNOWN + + +def get_table(data, cols, header=None, strip=True, sort_by_key=None, sort_order_reverse=False): + """Takes a list of dictionaries, formats the data, and returns + the formatted data as a text table. + + Required Parameters: + data - Data to process (list of dictionaries). (Type: List) + cols - List of cols in the dictionary. (Type: List) + + Optional Parameters: + header - The table header. (Type: List) + strip - Strip/Trim values or not. (Type: Boolean) + sort_by_key - The key to sort by. (Type: String) + sort_order_reverse - Default sort order is ascending, if + True sort order will change to descending. (Type: bool) + + Inspired by + https://www.calazan.com/python-function-for-displaying-a-list-of-dictionaries-in-table-format/ + """ + if not data: + return '' + + # Sort the data if a sort key is specified (default sort order is ascending) + if sort_by_key: + data = sorted(data, + key=operator.itemgetter(sort_by_key), + reverse=sort_order_reverse) + + # If header is not empty, create a list of dictionary from the cols and the header and + # insert it before first row of data + if header: + header = dict(zip(cols, header)) + data.insert(0, header) + + # prepare data: decode from (mostly) UTF-8 to Unicode, optionally strip values and get + # the maximum length per column + column_widths = collections.OrderedDict() + for idx, row in enumerate(data): + for col in cols: + try: + if strip: + data[idx][col] = str(row[col]).strip() + else: + data[idx][col] = str(row[col]) + except: + return 'Unknown column "{}"'.format(col) + # get the maximum length + try: + column_widths[col] = max(column_widths[col], len(data[idx][col])) + except: + column_widths[col] = len(data[idx][col]) + + if header: + # Get the length of each column and create a '---' divider based on that length + header_divider = [] + for col, width in column_widths.items(): + header_divider.append('-' * width) + + # Insert the header divider below the header row + header_divider = dict(zip(cols, header_divider)) + data.insert(1, header_divider) + + # create the output + table = '' + cnt = 0 + for row in data: + tmp = '' + for col, width in column_widths.items(): + if cnt != 1: + tmp += '{:<{}} ! '.format(row[col], width) + else: + # header row + tmp += '{:<{}}-+-'.format(row[col], width) + cnt += 1 + table += tmp[:-2] + '\n' + + return table + + +def get_worst(state1, state2): + """Compares state1 to state2 and returns result based on the following + STATE_OK < STATE_UNKNOWN < STATE_WARNING < STATE_CRITICAL + It will prioritize any non-OK state. + + Note that numerically the above does not hold. + """ + state1 = int(state1) + state2 = int(state2) + if STATE_CRIT in [state1, state2]: + return STATE_CRIT + if STATE_WARN in [state1, state2]: + return STATE_WARN + if STATE_UNKNOWN in [state1, state2]: + return STATE_UNKNOWN + return STATE_OK + + +def guess_type(v, consumer='python'): + """Guess the type of a value (None, int, float or string) for different types of consumers + (Python, SQLite etc.). + For Python, use isinstance() to check for example if a number is an integer. + + >>> guess_type('1') + 1 + >>> guess_type('1', 'sqlite') + 'integer' + >>> guess_type('1.0') + 1.0 + >>> guess_type('1.0', 'sqlite') + 'real' + >>> guess_type('abc') + 'abc' + >>> guess_type('abc', 'sqlite') + 'text' + >>> + >>> value_type = lib.base.guess_type(value) + >>> if isinstance(value_type, int) or isinstance(value_type, float): + >>> ... + """ + if consumer == 'python': + if v is None: + return None + try: + return int(v) + except ValueError: + try: + return float(v) + except ValueError: + return str(v) + + if consumer == 'sqlite': + if v is None: + return 'string' + try: + int(v) + return 'integer' + except ValueError: + try: + float(v) + return 'real' + except ValueError: + return 'text' + + +def is_empty_list(l): + """Check if a list only contains either empty elements or whitespace + """ + return all(s == '' or s.isspace() for s in l) + + +def is_numeric(value): + """Return True if value is really numeric (int, float, whatever). + + >>> is_numeric(+53.4) + True + >>> is_numeric('53.4') + False + """ + return isinstance(value, numbers.Number) + + +def lookup_lod(dicts, key, needle, default=None): + """Search in a list of dictionaries ("lod)" for a value in a given dict key. + Return a default if not found. + + >>> dicts = [ + ... { "name": "Tom", "age": 10 }, + ... { "name": "Mark", "age": 5 }, + ... { "name": "Pam", "age": 7 }, + ... { "name": "Dick", "age": 12 } + ... ] + >>> lookup_lod(dicts, 'name', 'Pam') + {'name': 'Pam', 'age': 7} + >>> lookup_lod(dicts, 'name', 'Pamela') + >>> + """ + return next((item for item in dicts if item[key] == needle), None) + + +def match_range(value, spec): + """Decides if `value` is inside/outside the threshold spec. + + Parameters + ---------- + spec : str + Nagios range specification + value : int or float + Numeric value + + Returns + ------- + bool + `True` if `value` is inside the bounds for a non-inverted + `spec`, or outside the bounds for an inverted `spec`. Otherwise `False`. + + Inspired by https://github.com/mpounsett/nagiosplugin/blob/master/nagiosplugin/range.py + """ + + def parse_range(spec): + """ + Inspired by https://github.com/mpounsett/nagiosplugin/blob/master/nagiosplugin/range.py + + +--------+-------------------+-------------------+--------------------------------+ + | -w, -c | OK if result is | WARN/CRIT if | lib.base.parse_range() returns | + +--------+-------------------+-------------------+--------------------------------+ + | 10 | in (0..10) | not in (0..10) | (0, 10, False) | + +--------+-------------------+-------------------+--------------------------------+ + | -10 | in (-10..0) | not in (-10..0) | (0, -10, False) | + +--------+-------------------+-------------------+--------------------------------+ + | 10: | in (10..inf) | not in (10..inf) | (10, inf, False) | + +--------+-------------------+-------------------+--------------------------------+ + | : | in (0..inf) | not in (0..inf) | (0, inf, False) | + +--------+-------------------+-------------------+--------------------------------+ + | ~:10 | in (-inf..10) | not in (-inf..10) | (-inf, 10, False) | + +--------+-------------------+-------------------+--------------------------------+ + | 10:20 | in (10..20) | not in (10..20) | (10, 20, False) | + +--------+-------------------+-------------------+--------------------------------+ + | @10:20 | not in (10..20) | in 10..20 | (10, 20, True) | + +--------+-------------------+-------------------+--------------------------------+ + | @~:20 | not in (-inf..20) | in (-inf..20) | (-inf, 20, True) | + +--------+-------------------+-------------------+--------------------------------+ + | @ | not in (0..inf) | in (0..inf) | (0, inf, True) | + +--------+-------------------+-------------------+--------------------------------+ + """ + + def parse_atom(atom, default): + if atom == '': + return default + if '.' in atom: + return float(atom) + return int(atom) + + if spec is None or str(spec).lower() == 'none': + return (True, None) + if not isinstance(spec, str): + spec = str(spec) + invert = False + if spec.startswith('@'): + invert = True + spec = spec[1:] + if ':' in spec: + try: + start, end = spec.split(':') + except: + return (False, 'Not using range definition correctly') + else: + start, end = '', spec + if start == '~': + start = float('-inf') + else: + start = parse_atom(start, 0) + end = parse_atom(end, float('inf')) + if start > end: + return (False, 'Start %s must not be greater than end %s' % (start, end)) + return (True, (start, end, invert)) + + if spec is None or str(spec).lower() == 'none': + return (True, True) + success, result = parse_range(spec) + if not success: + return (success, result) + start, end, invert = result + if isinstance(value, (str, bytes)): + value = float(value.replace('%', '')) + if value < start: + return (True, False ^ invert) + if value > end: + return (True, False ^ invert) + return (True, True ^ invert) + + +def over_and_out(msg, state=STATE_OK, perfdata='', always_ok=False): + """Over and Out (OaO) + + Print the stripped plugin message. If perfdata is given, attach it + by `|` and print it stripped. Exit with `state`, or with STATE_OK (0) if + `always_ok` is set to `True`. + """ + if perfdata: + print(msg.strip() + '|' + perfdata.strip()) + else: + print(msg.strip()) + if always_ok: + sys.exit(STATE_OK) + sys.exit(state) + + +def smartcast(value): + """Returns the value converted to float if possible, else string, else the + uncasted value. + """ + for test in [float, str]: + try: + return test(value) + except ValueError: + continue + # No match + return value + + +def sort(array, reverse=True, sort_by_key=False): + """Sort a simple 1-dimensional dictionary + """ + if isinstance(array, dict): + if not sort_by_key: + return sorted(array.items(), key=lambda x: x[1], reverse=reverse) + return sorted(array.items(), key=lambda x: str(x[0]).lower(), reverse=reverse) + return array + + +def state2str(state, empty_ok=True, prefix='', suffix=''): + """Return the state's string representation. + The square brackets around the state cause icingaweb2 to color the state. + + >> lib.base.state2str(2) + '[CRIT]' + >>> state2str(0) + '' + >>> state2str(0, empty_ok=False) + '[OK]' + >>> state2str(0, empty_ok=False, suffix=' ') + '[OK] ' + >>> state2str(0, empty_ok=False, prefix=' (', suffix=')') + ' ([OK])' + """ + state = int(state) + if state == STATE_OK and empty_ok: + return '' + if state == STATE_OK and not empty_ok: + return '{}[OK]{}'.format(prefix, suffix) + if state == STATE_WARN: + return '{}[WARNING]{}'.format(prefix, suffix) + if state == STATE_CRIT: + return '{}[CRITICAL]{}'.format(prefix, suffix) + if state == STATE_UNKNOWN: + return '{}[UNKNOWN]{}'.format(prefix, suffix) + return state + + +def str2state(string, ignore_error=True): + """Return the numeric state based on a (case-insensitive) string. + Matches up to the first four characters. + + >>> str2state('ok') + 0 + >>> str2state('okidoki') + 3 + >>> str2state('okidoki', ignore_error=False) + None + >>> str2state('war') + 3 + >>> str2state('warn') + 1 + >>> str2state('Warnung') + 1 + >>> str2state('CrITical') + 2 + >>> str2state('UNKNOWN') + 3 + >>> str2state('gobbledygook') + 3 + >>> str2state('gobbledygook', ignore_error=False) + None + """ + string = str(string).lower()[0:4] + if string == 'ok': + return STATE_OK + if string == 'warn': + return STATE_WARN + if string == 'crit': + return STATE_CRIT + if string == 'unkn': + return STATE_UNKNOWN + if ignore_error: + return STATE_UNKNOWN + return None + + +def sum_dict(dict1, dict2): + """Sum up two dictionaries, maybe with different keys. + + >>> sum_dict({'in': 100, 'out': 10}, {'in': 50, 'error': 5, 'uuid': '1234-xyz'}) + {'in': 150, 'error': 5, 'out': 10} + """ + total = {} + for key, value in dict1.items(): + if not is_numeric(value): + continue + if key in total: + total[key] += value + else: + total[key] = value + for key, value in dict2.items(): + if not is_numeric(value): + continue + if key in total: + total[key] += value + else: + total[key] = value + return total + + +def sum_lod(mylist): + """Sum up a list of (simple 1-dimensional) dictionary items. + + sum_lod([{'in': 100, 'out': 10}, {'in': 50, 'out': 20}, {'error': 5, 'uuid': '1234-xyz'}]) + >>> {'in': 150, 'out': 30, 'error': 5} + """ + total = {} + for mydict in mylist: + for key, value in mydict.items(): + if not is_numeric(value): + continue + if key in total: + total[key] += value + else: + total[key] = value + return total diff --git a/checker/linuxfabric/human.py b/checker/linuxfabric/human.py new file mode 100644 index 0000000..072a898 --- /dev/null +++ b/checker/linuxfabric/human.py @@ -0,0 +1,222 @@ +#! /usr/bin/env python3 +# -*- coding: utf-8; py-indent-offset: 4 -*- +# +# Author: Linuxfabrik GmbH, Zurich, Switzerland +# Contact: info (at) linuxfabrik (dot) ch +# https://www.linuxfabrik.ch/ +# License: The Unlicense, see LICENSE file. + +# https://github.com/Linuxfabrik/monitoring-plugins/blob/main/CONTRIBUTING.rst + +"""Functions to convert raw numbers, times etc. to a human readable representation (and sometimes + back). +""" + +__author__ = 'Linuxfabrik GmbH, Zurich/Switzerland' +__version__ = '2023051201' + +import math + + +def bits2human(n, _format='%(value).1f%(symbol)s'): + """Converts n bits to a human readable format. + + >>> bits2human(8191) + '1023.9B' + >>> bits2human(8192) + '1.0KiB' + """ + symbols = ('B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB') + prefix = {} + prefix['B'] = 8 + for i, s in enumerate(symbols[1:]): + prefix[s] = 1024 ** (i + 1) * 8 + for symbol in reversed(symbols): + if n >= prefix[symbol]: + value = float(n) / prefix[symbol] # pylint: disable=W0641 + return _format % locals() + return _format % dict(symbol=symbols[0], value=n) + + +def bps2human(n, _format='%(value).1f%(symbol)s'): + """Converts n bits per scond to a human readable format. + + >>> bps2human(72000000) + '72Mbps' + """ + symbols = ('bps', 'Kbps', 'Mbps', 'Gbps', 'Tbps', 'Pbps', 'Ebps', 'Zbps', 'Ybps') + prefix = {} + for i, s in enumerate(symbols[1:]): + prefix[s] = 1000 ** (i + 1) + for symbol in reversed(symbols[1:]): + if n >= prefix[symbol]: + value = float(n) / prefix[symbol] # pylint: disable=W0641 + return _format % locals() + return _format % dict(symbol=symbols[0], value=n) + + +def bytes2human(n, _format='%(value).1f%(symbol)s'): + """Converts n bytes to a human readable format. + + >>> bytes2human(1023) + '1023.0B' + >>> bytes2human(1024) + '1.0KiB' + + https://github.com/giampaolo/psutil/blob/master/psutil/_common.py + """ + symbols = ('B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB') + prefix = {} + for i, s in enumerate(symbols[1:]): + # Returns 1 with the bits shifted to the left by (i + 1)*10 places + # (and new bits on the right-hand-side are zeros). This is the same + # as multiplying x by 2**y. + prefix[s] = 1 << (i + 1) * 10 + for symbol in reversed(symbols[1:]): + if n >= prefix[symbol]: + value = float(n) / prefix[symbol] # pylint: disable=W0641 + return _format % locals() + return _format % dict(symbol=symbols[0], value=n) + + +def human2bytes(string, binary=True): + """Converts a string such as '3.072GiB' to 3298534883 bytes. If "binary" is set to True + (default due to Microsoft), it will use powers of 1024, otherwise powers of 1000 (decimal). + Returns 0 on failure. + + Works with: + * 3.072GiB (always multiplied by 1024) + * 3.072GB (multiplied by 1024 if binary == True, else multiplied by 1000) + * 3.072G (multiplied by 1024 if binary == True, else multiplied by 1000) + """ + try: + string = string.lower() + if 'kib' in string: + return int(float(string.replace('kib', '').strip()) * 1024) + if 'mib' in string: + return int(float(string.replace('mib', '').strip()) * 1024 ** 2) + if 'gib' in string: + return int(float(string.replace('gib', '').strip()) * 1024 ** 3) + if 'tib' in string: + return int(float(string.replace('tib', '').strip()) * 1024 ** 4) + if 'pib' in string: + return int(float(string.replace('pib', '').strip()) * 1024 ** 5) + + if 'k' in string: # matches "kb" or "k" + string = string.replace('kb', '').replace('k', '').strip() + if binary: + return int(float(string) * 1024) + return int(float(string) * 1000) + if 'm' in string: # matches "mb" or "m" + string = string.replace('mb', '').replace('m', '').strip() + if binary: + return int(float(string) * 1024 ** 2) + return int(float(string) * 1000 ** 2) + if 'g' in string: # matches "gb" or "g" + string = string.replace('gb', '').replace('g', '').strip() + if binary: + return int(float(string) * 1024 ** 3) + return int(float(string) * 1000 ** 3) + if 't' in string: # matches "tb" or "t" + string = string.replace('tb', '').replace('t', '').strip() + if binary: + return int(float(string) * 1024 ** 4) + return int(float(string) * 1000 ** 4) + if 'p' in string: # matches "pb" or "p" + string = string.replace('pb', '').replace('p', '').strip() + if binary: + return int(float(string) * 1024 ** 5) + return int(float(string) * 1000 ** 5) + if 'b' in string: + return int(float(string.replace('b', '').strip())) + return 0 + except: + return 0 + + +def number2human(n): + """ + >>> number2human(123456.8) + '123K' + >>> number2human(123456789.0) + '123M' + >>> number2human(9223372036854775808) + '9.2E' + """ + # according to the SI Symbols at + # https://en.wikipedia.org/w/index.php?title=Names_of_large_numbers§ion=5#Extensions_of_the_standard_dictionary_numbers + millnames = ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y'] + try: + n = float(n) + except: + return n + millidx = max(0, min(len(millnames) - 1, + int(math.floor(0 if n == 0 else math.log10(abs(n)) / 3)))) + return '{:.1f}{}'.format(n / 10 ** (3 * millidx), millnames[millidx]) + + +def seconds2human(seconds, keep_short=True, full_name=False): + """Returns a human readable time range string for a number of seconds. + + >>> seconds2human(0.125) + '0.12s' + >>> seconds2human(1) + '1s' + >>> seconds2human(59) + '59s' + >>> seconds2human(60) + '1m' + >>> seconds2human(61) + '1m 1s' + >>> seconds2human(1387775) + '2W 2D' + >>> seconds2human('1387775') + '2W 2D' + >>> seconds2human('1387775', full_name=True) + '2weeks 2days' + >>> seconds2human(1387775, keep_short=False, full_name=True) + '2weeks 2days 1hour 29minutes 35seconds' + """ + seconds = float(seconds) + + if full_name: + symbols = ( + ('years', 60 * 60 * 24 * 365), + ('months', 60 * 60 * 24 * 30), + ('weeks', 60 * 60 * 24 * 7), + ('days', 60 * 60 * 24), + ('hours', 60 * 60), + ('minutes', 60), + ('seconds', 1), + ('millisecs', 1e-3), + ('microsecs', 1e-6), + ('nanosecs', 1e-9), + ('picosecs', 1e-12), + ) + else: + symbols = ( + ('Y', 60 * 60 * 24 * 365), + ('M', 60 * 60 * 24 * 30), + ('W', 60 * 60 * 24 * 7), + ('D', 60 * 60 * 24), + ('h', 60 * 60), + ('m', 60), + ('s', 1), + ('ms', 1e-3), + ('us', 1e-6), + ('ns', 1e-9), + ('ps', 1e-12), + ) + + result = [] + for name, count in symbols: + value = seconds // count + if value: + seconds -= value * count + if full_name and value == 1: + name = name.rstrip('s') # "days" becomes "day" + result.append('{:.0f}{}'.format(value, name)) + + if len(result) > 2 and keep_short: + return ' '.join(result[:2]) + return ' '.join(result) diff --git a/checker/nagios.py b/checker/nagios.py index ad5b53c..974cea5 100644 --- a/checker/nagios.py +++ b/checker/nagios.py @@ -1,4 +1,5 @@ -UNKNOWN = -1 -OK = 0 -WARNING = 1 -CRITICAL = 2 +# TODO: remove non STATE_ vars +UNKNOWN = STATE_UNKNOWN = -1 +OK = STATE_OK = 0 +WARNING = STATE_WARN = 1 +CRITICAL = STATE_CRIT = 2 diff --git a/checker/print.py b/checker/print.py new file mode 100644 index 0000000..7101b0d --- /dev/null +++ b/checker/print.py @@ -0,0 +1,109 @@ +def create_description_list(data: list): + """ + Create a Description List HTML element based on the input list. + + Args: + data (list): A list containing lists, where each inner list has two elements: + the description term and the description details. + + Returns: + str: A string containing the HTML representation of the description list. + + Example: + data = [ + ["Description Term 1", "Description Details 1"], + ["Description Term 2", "Description Details 2"], + ["Description Term 3", "Description Details 3"], + ] + html = create_description_list(data) + print(html) + """ + html = "
" + for item in data: + html += f"
{item[0]}
{item[1]}
" + html += "
" + return html + + +def dict_to_perfdata(data: dict, replace_dashes=True): + """ + Transforms a dictionary into Icinga2 perfdata format. + + The input dictionary should have the following structure: + { + "item1": { + "value": any_type (required), + "warn": any_type (optional), + "crit": any_type (optional), + "min": any_type (optional), + "max": any_type (optional), + "unit": string (optional), + }, + ... + } + + :param data: A dictionary containing the perfdata items and their values. + :type data: dict + :param replace_dashes: If True, replace dashes in perfdata names/keys with underscores. Default is True. + :type replace_dashes: bool + :return: A string in Icinga2 perfdata format. + :rtype: str + """ + perfdata = [] + for key, values in data.copy().items(): + # Remove values that are None + for k, v in values.copy().items(): + if v is None: + del values[k] + + if replace_dashes: + key = key.replace('-', '_') + item = f"{key.strip()}={values['value']}{values.get('unit', '')}" + for k in ['warn', 'crit', 'min', 'max']: + if k in values: + item += f";{values[k]}" + else: + item += ";" + perfdata.append(item) + return " ".join(perfdata) + + +def print_icinga2_check_status(text_result: str, return_code: int, perfdata=None): + """ + Prints the status of an Icinga2 check with the given text result, return code, and perfdata. + + Example: + sample_text_result = "check is good, time 120s" + sample_return_code = 0 + sample_perfdata = { + "time": { + "value": 120, + "warn": 3, + "crit": 100, + "min": 0, + "max": 200, + }, + } + print_icinga2_check_status(sample_text_result, sample_return_code, sample_perfdata) + + :param text_result: The text result of the check command. + :type text_result: str + :param return_code: The return code of the check (UNKNOWN=-1, OK=0, WARNING=1, CRITICAL=2). + :type return_code: int + :param perfdata: A dictionary containing the perfdata items and their values, defaults to an empty dictionary. + :type perfdata: dict, optional + :raises ValueError: If an invalid return code is passed. + """ + if perfdata is None: + perfdata = {} + status_codes = { + -1: "UNKNOWN", + 0: "OK", + 1: "WARNING", + 2: "CRITICAL", + } + if return_code not in status_codes: + raise ValueError(f"Invalid return code: {return_code}") + status = status_codes[return_code] + perfdata_str = f' | {dict_to_perfdata(perfdata)}' if perfdata else '' + print(f"{status} - {text_result.strip()}{perfdata_str}")