working on refactor

This commit is contained in:
Cyberes 2023-06-15 11:00:41 -06:00
parent 847999f43b
commit 3df7c5d1ef
Signed by: cyberes
GPG Key ID: 6B4A33836A9500FE
12 changed files with 1156 additions and 395 deletions

View File

@ -141,11 +141,11 @@ if [ -z "$url" ] || [ $# -eq 0 ]; then
fi fi
proxyarg="" proxyarg=""
if [ ! -z $proxy ] ; then if [ -n $proxy ] ; then
proxyarg=" -x "$proxy" " proxyarg=" -x "$proxy" "
fi fi
headerarg="" headerarg=""
if [ ! -z "$header" ] ; then if [ -n "$header" ] ; then
headerarg=' -H "'$header'" ' headerarg=' -H "'$header'" '
fi fi
followarg="" followarg=""
@ -162,7 +162,7 @@ if [ $cookies -eq 1 ] ; then
cookiesarg=" -c ${COOKIE_JAR_TEMP_PATH} -b ${COOKIE_JAR_TEMP_PATH}" cookiesarg=" -c ${COOKIE_JAR_TEMP_PATH} -b ${COOKIE_JAR_TEMP_PATH}"
fi fi
bodyarg="" bodyarg=""
if [ ! -z $body ]; then if [ -n $body ]; then
body=$(echo $body| sed "s/\"/\\\\\"/g") body=$(echo $body| sed "s/\"/\\\\\"/g")
bodyarg=" --data \""$body"\"" bodyarg=" --data \""$body"\""
if [ $encodeurl -eq 1 ]; then if [ $encodeurl -eq 1 ]; then

View File

@ -8,14 +8,15 @@ import traceback
import psutil import psutil
import checker.nagios as nagios import checker.nagios as nagios
from checker.markdown import list_to_markdown_table from checker.linuxfabric.base import get_state
from checker import list_to_markdown_table, dict_to_perfdata, print_icinga2_check_status
parser = argparse.ArgumentParser(description='Check network interface bandwidth utilization.') parser = argparse.ArgumentParser(description='Check network interface bandwidth utilization.')
parser.add_argument('--bandwidth', type=float, required=True, help='Bandwidth speed in Mbps. Used to calculate percentage.') parser.add_argument('--bandwidth', type=float, default=0, help='Bandwidth speed in Mbps. Used to calculate percentage. Default is 0 which disables warning and critical levels.')
parser.add_argument('--critical', type=int, default=75, help='Critical if percent of bandwidth usage is greater than or equal to this.') parser.add_argument('--critical', type=int, default=75, help='Critical if percent of bandwidth usage is greater than or equal to this.')
parser.add_argument('--warn', type=int, default=50, help='Warning if percent of bandwidth usage is greater than or equal to this.') parser.add_argument('--warn', type=int, default=50, help='Warning if percent of bandwidth usage is greater than or equal to this.')
parser.add_argument('--max', type=int, default=None, help='Set the max value the bandwidth can be. Useful for graphs and whatever.') parser.add_argument('--max', type=int, default=None, help='Set the max value the bandwidth can be. Useful for graphs and whatever.')
parser.add_argument('--ignore', nargs='*', default=[], help='Interface names to ignore, separated by a space.') parser.add_argument('--ignore', nargs='*', default=['lo'], help='Interface names to ignore, separated by a space. Default: lo')
parser.add_argument('--ignore-re', default=None, help='Regex matching interface names to ignore.') parser.add_argument('--ignore-re', default=None, help='Regex matching interface names to ignore.')
args = parser.parse_args() args = parser.parse_args()
@ -71,8 +72,8 @@ def calculate_network_traffic(interface, interval=1):
def main(): def main():
data = [] data = []
warn_value = (args.bandwidth * args.warn / 100) warn_value = (args.bandwidth * args.warn / 100) if args.bandwidth else 0
crit_value = (args.bandwidth * args.critical / 100) crit_value = (args.bandwidth * args.critical / 100) if args.bandwidth else 0
# Get network interface statistics # Get network interface statistics
net_io_counters = psutil.net_io_counters(pernic=True) net_io_counters = psutil.net_io_counters(pernic=True)
@ -89,46 +90,51 @@ def main():
critical = [] critical = []
warn = [] warn = []
ok = [] ok = []
perf_data = [] perfdata = {}
for i in range(len(data)): for i in range(len(data)):
interface = data[i][0] interface = data[i][0]
bandwidth_utilization = data[i][3] bandwidth_utilization = data[i][3]
if bandwidth_utilization >= crit_value: state_code = get_state(bandwidth_utilization, warn_value, crit_value, 'lt')
if state_code == nagios.STATE_CRIT:
critical.append(interface) critical.append(interface)
state = 'critical' state = 'critical'
exit_code = nagios.CRITICAL exit_code = max(exit_code, nagios.CRITICAL)
elif bandwidth_utilization >= warn_value: elif state_code == nagios.STATE_WARN:
warn.append(interface) warn.append(interface)
state = 'warning' state = 'warning'
if exit_code < nagios.WARNING: exit_code = max(exit_code, nagios.WARNING)
exit_code = nagios.WARNING
else: else:
ok.append(interface) ok.append(interface)
state = 'ok' state = 'ok'
data[i][4] = f'[{state.upper()}]'
perf_data.append(f'{interface}={round(bandwidth_utilization, 2)}Mbps;{warn_value};{crit_value};{f"0;{args.max};" if args.max else ""} ')
# Print the status data[i][4] = f'[{state.upper()}]'
perfdata.update({
interface: {
'value': round(bandwidth_utilization, 2),
'warn': warn_value,
'crit': crit_value,
'min': 0 if args.max else None,
'unit': 'Mbps'
}
})
if exit_code == nagios.CRITICAL: if exit_code == nagios.CRITICAL:
status = 'CRITICAL'
listed_interfaces = [*critical, *warn] listed_interfaces = [*critical, *warn]
elif exit_code == nagios.WARNING: elif exit_code == nagios.WARNING:
status = 'WARNING'
listed_interfaces = warn listed_interfaces = warn
else: else:
status = 'OK'
listed_interfaces = ok listed_interfaces = ok
listed_glances = [] listed_glances = []
for interface in listed_interfaces: for interface in listed_interfaces:
listed_glances.append(f'{interface}: {round(get_interface_data(interface, data)[3], 2)}Mbps') listed_glances.append(f'{interface}: {round(get_interface_data(interface, data)[3], 2)} Mbps')
print(f'{status} - {", ".join(listed_glances)}')
data = [(x[0], f'{round(x[3], 2)} Mbps', x[4]) for x in data] data = [(x[0], f'{round(x[3], 2)} Mbps', x[4]) for x in data]
data.insert(0, ('Interface', 'Bandwidth', 'State')) data.insert(0, ('Interface', 'Bandwidth', 'State'))
print(list_to_markdown_table(data, align='left', seperator='!', borders=False))
print(f'|{"".join(perf_data)}') print_icinga2_check_status(f'{", ".join(listed_glances)}\n{list_to_markdown_table(data, align="left", seperator="!", borders=False)}', exit_code, perfdata)
sys.exit(exit_code) sys.exit(exit_code)

View File

@ -1,226 +0,0 @@
#!/usr/bin/env python3
import argparse
import asyncio
import json
import os
import sys
import tempfile
import traceback
import urllib
import numpy as np
import requests
from PIL import Image
from nio import AsyncClient, AsyncClientConfig, LoginResponse, RoomSendError
from urllib3.exceptions import InsecureRequestWarning
from checker import nagios
from checker.synapse_client import send_image, write_login_details_to_disk
parser = argparse.ArgumentParser(description='')
parser.add_argument('--user', required=True, help='User ID for the bot.')
parser.add_argument('--pw', required=True, help='Password for the bot.')
parser.add_argument('--hs', required=True, help='Homeserver of the bot.')
parser.add_argument('--admin-endpoint', required=True, help='Admin endpoint that will be called to purge media for this user.')
parser.add_argument('--room', required=True, help='The room the bot should send its test messages in.')
parser.add_argument('--check-domain', required=True, help='The domain that should be present.')
parser.add_argument('--media-cdn-redirect', default='true', help='If set, the server must respond with a redirect to the media CDN domain.')
parser.add_argument('--required-headers', nargs='*', help="If these headers aren't set to the correct value, critical. Use the format 'key=value")
parser.add_argument('--auth-file', help="File to cache the bot's login details to.")
parser.add_argument('--timeout', type=float, default=90, help='Request timeout limit.')
parser.add_argument('--warn', type=float, default=2.0, help='Manually set warn level.')
parser.add_argument('--crit', type=float, default=2.5, help='Manually set critical level.')
args = parser.parse_args()
if args.media_cdn_redirect == 'true':
args.media_cdn_redirect = True
elif args.media_cdn_redirect == 'false':
args.media_cdn_redirect = False
else:
print('UNKNOWN: could not parse the value for --media-cdn-redirect')
sys.exit(nagios.UNKNOWN)
def verify_media_header(header: str, header_dict: dict, good_value: str = None, warn_value: str = None, critical_value: str = None):
"""
If you don't specify good_value, warn_value, or critical_value then the header will only be checked for existience.
"""
# Convert everything to lowercase strings to prevent any wierdness
header_dict = {k.lower(): v for k, v in header_dict.items()}
header = header.lower()
header_value = str(header_dict.get(header))
warn_value = str(warn_value)
critical_value = str(critical_value)
if not header_value:
return f'CRITICAL: missing header\n"{header}"', nagios.CRITICAL
if good_value:
good_value = str(good_value)
if header_value == good_value:
return f'OK: {header}: "{header_value}"', nagios.OK
else:
return f'CRITICAL: {header} is not "{good_value}", is "{header_value}"', nagios.CRITICAL
# elif warn_value and header_value == warn_value:
# return f'WARN: {header}: "{header_value}"', nagios.WARNING
# elif critical_value and header_value == critical_value:
# return f'CRITICAL: {header}: "{header_value}"', nagios.CRITICAL
return f'OK: {header} is present', nagios.OK # with value "{header_value}"'
async def main() -> None:
exit_code = nagios.OK
async def cleanup(client, test_image_path, image_event_id=None):
nonlocal exit_code
# Clean up
if image_event_id:
await client.room_redact(args.room, image_event_id)
os.remove(test_image_path)
await client.close()
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
try:
r = requests.delete(f'{args.admin_endpoint}/_synapse/admin/v1/users/{args.user}/media', headers={'Authorization': f'Bearer {client.access_token}'}, verify=False)
if r.status_code != 200:
if nagios.WARNING < exit_code:
exit_code = nagios.WARNING
return f"WARN: failed to purge media for this user.\n{r.text}"
else:
return None
except Exception as e:
if nagios.WARNING < exit_code:
exit_code = nagios.WARNING
return f"WARN: failed to purge media for this user.\n{e}"
client = AsyncClient(args.hs, args.user, config=AsyncClientConfig(request_timeout=args.timeout, max_timeout_retry_wait_time=10))
if args.auth_file:
# If there are no previously-saved credentials, we'll use the password
if not os.path.exists(args.auth_file):
resp = await client.login(args.pw)
# check that we logged in successfully
if isinstance(resp, LoginResponse):
write_login_details_to_disk(resp, args.hs, args.auth_file)
else:
print(f'CRITICAL: failed to log in.\n{resp}')
sys.exit(nagios.CRITICAL)
else:
# Otherwise the config file exists, so we'll use the stored credentials
with open(args.auth_file, "r") as f:
config = json.load(f)
client = AsyncClient(config["homeserver"])
client.access_token = config["access_token"]
client.user_id = config["user_id"]
client.device_id = config["device_id"]
else:
await client.login(args.pw)
await client.join(args.room)
# Create a random image
imarray = np.random.rand(100, 100, 3) * 255
im = Image.fromarray(imarray.astype('uint8')).convert('RGBA')
_, test_image_path = tempfile.mkstemp()
test_image_path = test_image_path + '.png'
im.save(test_image_path)
# Send the image and get the event ID
image_event_id = (await send_image(client, args.room, test_image_path))
if isinstance(image_event_id, RoomSendError):
await cleanup(client, test_image_path)
print(f'CRITICAL: failed to send message.\n{image_event_id}')
sys.exit(nagios.CRITICAL)
image_event_id = image_event_id.event_id
# Get the event
image_event = (await client.room_get_event(args.room, image_event_id)).event
# convert mxc:// to http://
target_file_url = await client.mxc_to_http(image_event.url)
# Check the file. Ignore the non-async thing here, it doesn't matter in this situation.
# Remember: Cloudflare does not cache non-GET requests.
r = requests.head(target_file_url, allow_redirects=False)
prints = []
if r.status_code != 200 and not args.media_cdn_redirect:
await cleanup(client, test_image_path, image_event_id=image_event_id)
prints.append(f'CRITICAL: status code is "{r.status_code}"')
sys.exit(nagios.CRITICAL)
else:
prints.append(f'OK: status code is "{r.status_code}"')
headers = dict(r.headers)
# Check domain
if args.media_cdn_redirect:
if 'location' in headers:
domain = urllib.parse.urlparse(headers['location']).netloc
if domain != args.check_domain:
exit_code = nagios.CRITICAL
prints.append(f'CRITICAL: redirect to media CDN domain is "{domain}"')
else:
prints.append(f'OK: media CDN domain is "{domain}"')
else:
exit_code = nagios.CRITICAL
prints.append(f'CRITICAL: was not redirected to the media CDN domain.')
# Make sure we aren't redirected if we're a Synapse server
test = requests.head(target_file_url, headers={'User-Agent': 'Synapse/1.77.3'}, allow_redirects=False)
if test.status_code != 200:
prints.append('CRITICAL: Synapse user-agent is redirected with status code', test.status_code)
exit_code = nagios.CRITICAL
else:
prints.append(f'OK: Synapse user-agent is not redirected.')
else:
if 'location' in headers:
exit_code = nagios.CRITICAL
prints.append(f"CRITICAL: recieved 301 to {urllib.parse.urlparse(headers['location']).netloc}")
else:
prints.append(f'OK: was not redirected.')
if args.required_headers:
# Icinga may pass the values as one string
if len(args.required_headers) == 1:
args.required_headers = args.required_headers[0].split(' ')
for item in args.required_headers:
key, value = item.split('=')
header_chk, code = verify_media_header(key, headers, good_value=value)
prints.append(header_chk)
if code > exit_code:
exit_code = code
# results = [verify_media_header('synapse-media-local-status', headers), verify_media_header('synapse-media-s3-status', headers, good_value='200'), verify_media_header('synapse-media-server', headers, good_value='s3')]
# for header_chk, code in results:
# prints.append(header_chk)
# if code > exit_code:
# exit_code = code
clean_msg = await cleanup(client, test_image_path, image_event_id=image_event_id)
if exit_code == nagios.OK:
print('OK: media CDN is good.')
elif exit_code == nagios.UNKNOWN:
print('UNKNOWN: media CDN is bad.')
elif exit_code == nagios.WARNING:
print('WARNING: media CDN is bad.')
elif exit_code == nagios.CRITICAL:
print('CRITICAL: media CDN is bad.')
for msg in prints:
print(msg)
if clean_msg:
print(clean_msg)
sys.exit(exit_code)
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as e:
print(f'UNKNOWN: exception\n{e}')
print(traceback.format_exc())
sys.exit(nagios.UNKNOWN)

View File

@ -2,20 +2,11 @@
import argparse import argparse
import sys import sys
import traceback
import requests from checker import nagios, print_icinga2_check_status
from checker.http import get_with_retry
from checker import nagios from checker.linuxfabric.base import get_state
def check_nginx_status(url):
try:
response = requests.get(url)
response.raise_for_status()
return response.text
except requests.exceptions.RequestException as e:
print("CRITICAL - Unable to connect to Nginx stub_status: {}".format(e))
sys.exit(2)
def parse_nginx_status(status): def parse_nginx_status(status):
@ -38,49 +29,70 @@ def parse_nginx_status(status):
data["waiting"] = int(parts[5]) data["waiting"] = int(parts[5])
return data return data
except Exception as e: except Exception as e:
print(f'UNKNOWN: failed to parse status page: {e}') print_icinga2_check_status(f'failed to parse status page: {e}', nagios.UNKNOWN)
sys.exit(nagios.UNKNOWN) sys.exit(nagios.UNKNOWN)
def main(): def main():
parser = argparse.ArgumentParser(description="Check Nginx status using stub_status.") parser = argparse.ArgumentParser(description="Check Nginx status using stub_status.")
parser.add_argument("--url", required=True, help="URL to Nginx stub_status.") parser.add_argument("--url", required=True, help="URL to Nginx stub_status.")
parser.add_argument("--critical-active", type=int, default=0, help="Critical threshold for active connections. Default: disabled") parser.add_argument("--critical-active", type=int, default=None, help="Critical threshold for active connections. Default: 0 (disabled)")
parser.add_argument("--warning-active", type=int, default=0, help="Warning threshold for active connections. Default: disabled") parser.add_argument("--warning-active", type=int, default=None, help="Warning threshold for active connections. Default: 0 (disabled)")
parser.add_argument("--critical-waiting", type=int, default=0, help="Critical threshold for waiting connections. Default: disabled") parser.add_argument("--critical-waiting", type=int, default=None, help="Critical threshold for waiting connections. Default: 0 (disabled)")
parser.add_argument("--warning-waiting", type=int, default=0, help="Warning threshold for waiting connections. Default: disabled") parser.add_argument("--warning-waiting", type=int, default=None, help="Warning threshold for waiting connections. Default: 0 (disabled)")
args = parser.parse_args() args = parser.parse_args()
status = check_nginx_status(args.url) status = get_with_retry(args.url).text
data = parse_nginx_status(status) data = parse_nginx_status(status)
status_str = "Active connections: {active_connections}, Waiting: {waiting}, Accepted: {accepted}, Handled: {handled}, Requests: {requests}, Reading: {reading}, Writing: {writing}".format(**data) perfdata_dict = {
"active_connections": {
"value": data["active_connections"],
"warn": args.warning_active,
"crit": args.critical_active,
},
"waiting": {
"value": data["waiting"],
"warn": args.warning_waiting,
"crit": args.critical_waiting,
},
"accepted": {"value": data["accepted"]},
"handled": {"value": data["handled"]},
"requests": {"value": data["requests"]},
"reading": {"value": data["reading"]},
"writing": {"value": data["writing"]},
}
if args.warning_active > 0 and args.critical_active > 0 and args.warning_waiting > 0 and args.critical_waiting > 0: return_code = nagios.STATE_OK
perfdata = "| active_connections={active_connections};{warning};{critical} waiting={waiting};{warning_waiting};{critical_waiting} accepted={accepted} handled={handled} requests={requests} reading={reading} writing={writing}".format( if args.warning_active or args.critical_active:
warning=args.warning_active, active_connections_state = get_state(
critical=args.critical_active, data["active_connections"], args.warning_active, args.critical_active, "ge"
warning_waiting=args.warning_waiting,
critical_waiting=args.critical_waiting,
**data
) )
if data["active_connections"] >= args.critical_active or data["waiting"] >= args.critical_waiting:
print("CRITICAL:", status_str, perfdata)
sys.exit(nagios.CRITICAL)
elif data["active_connections"] >= args.warning_active or data["waiting"] >= args.warning_waiting:
print("WARNING:", status_str, perfdata)
sys.exit(nagios.WARNING)
else: else:
print("OK:", status_str) active_connections_state = nagios.STATE_OK
sys.exit(nagios.OK) return_code = max(active_connections_state, return_code)
if args.warning_waiting or args.critical_waiting:
waiting_state = get_state(data["waiting"], args.warning_waiting, args.critical_waiting, "ge")
else: else:
perfdata = "| active_connections={active_connections} waiting={waiting} accepted={accepted} handled={handled} requests={requests} reading={reading} writing={writing}".format( waiting_state = nagios.STATE_OK
**data return_code = max(waiting_state, return_code)
)
print("OK:", status_str) # if active_connections_state == nagios.STATE_CRIT or waiting_state == nagios.STATE_CRIT:
print('Critical and warning levels disabled.', perfdata) # return_code = nagios.CRITICAL
sys.exit(nagios.OK) # elif active_connections_state == nagios.STATE_WARN or waiting_state == nagios.STATE_WARN:
# return_code = nagios.WARNING
# else:
# return_code = nagios.OK
status_str = "Active connections: {active_connections}, Waiting: {waiting}, Accepted: {accepted}, Handled: {handled}, Requests: {requests}, Reading: {reading}, Writing: {writing}".format(**data)
print_icinga2_check_status(status_str, return_code, perfdata_dict)
sys.exit(return_code)
if __name__ == "__main__": if __name__ == "__main__":
try:
main() main()
except Exception as e:
print_icinga2_check_status(f'exception "{e}" \n {traceback.format_exc()}', nagios.UNKNOWN)
sys.exit(nagios.UNKNOWN)

View File

@ -1,15 +1,17 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import argparse import argparse
import sys import sys
import time
import traceback import traceback
from ipaddress import ip_address, ip_network from ipaddress import ip_network
import numpy as np import numpy as np
import requests import requests
from urllib3.exceptions import InsecureRequestWarning from urllib3.exceptions import InsecureRequestWarning
import checker.nagios as nagios import checker.nagios as nagios
from checker import print_icinga2_check_status
from checker.http import get_with_retry
from checker.linuxfabric.base import get_state
from checker.markdown import list_to_markdown_table from checker.markdown import list_to_markdown_table
from checker.units import filesize from checker.units import filesize
@ -71,36 +73,33 @@ def main():
interface_names[interface['name']] = name interface_names[interface['name']] = name
if not len(interface_names.keys()): if not len(interface_names.keys()):
print(f'UNKNOWN: did not find any valid interface names! Double-check the name.') print_icinga2_check_status('did not find any valid interface names! Double-check the name.', nagios.STATE_UNKNOWN)
sys.exit(nagios.UNKNOWN) sys.exit(nagios.UNKNOWN)
for name, interface in interface_names.items(): for name, interface in interface_names.items():
# Fetch the data # Fetch the data
traffic_data = [] traffic_data = []
for _ in range(args.duration): for _ in range(args.duration):
start_time = time.time() # start_time = time.time()
response = requests.get(f'https://{args.opnsense}/api/diagnostics/traffic/top/{interface}', response = get_with_retry('https://{args.opnsense}/api/diagnostics/traffic/top/{interface}',
headers={'Accept': 'application/json'}, auth=(args.key, args.secret), verify=False, headers={'Accept': 'application/json'}, auth=(args.key, args.secret), verify=False,
timeout=args.timeout) timeout=args.timeout)
end_time = time.time() # end_time = time.time()
api_request_time = end_time - start_time # api_request_time = end_time - start_time
if response.status_code != 200:
print(f'UNKNOWN: unable to query OPNsense API for {interface}: {response.status_code}\n{response.text}')
sys.exit(nagios.UNKNOWN)
if isinstance(response.json(), list): if isinstance(response.json(), list):
print(F'UNKNOWN - OPNsense returned wrong datatype:\n{response.json()}') print_icinga2_check_status(f'OPNsense returned wrong datatype:\n{response.text}', nagios.STATE_UNKNOWN)
sys.exit(nagios.STATE_UNKNOWN)
for item in response.json().get(interface, {}).get('records', False): for item in response.json().get(interface, {}).get('records', False):
if item['address'] == args.host: if item['address'] == args.host:
traffic_data.append(item) traffic_data.append(item)
adjusted_sleep_duration = max(1 - api_request_time, 0) # adjusted_sleep_duration = max(1 - api_request_time, 0)
time.sleep(adjusted_sleep_duration) # time.sleep(adjusted_sleep_duration)
if not len(traffic_data) and args.fail_empty: if not len(traffic_data) and args.fail_empty:
print('UNKNOWN: Interface or host not found in OPNsense API response. Raw response:') print_icinga2_check_status(f'interface or host not found in OPNsense API response. Raw response:\n{traffic_data}', nagios.STATE_UNKNOWN)
print(traffic_data)
sys.exit(nagios.UNKNOWN) sys.exit(nagios.UNKNOWN)
elif not len(traffic_data): elif not len(traffic_data):
# There was no traffic. # There was no traffic.
@ -121,11 +120,7 @@ def main():
'connections': int(np.average([len(x['details']) for x in traffic_data])) 'connections': int(np.average([len(x['details']) for x in traffic_data]))
} }
except Exception as e: except Exception as e:
print(f'UNKNOWN: Failed to parse traffic data: "{e}"') print_icinga2_check_status(f'failed to parse traffic data: {e}\n{traceback.format_exc()}\n{traffic_data}', nagios.STATE_UNKNOWN)
print(traceback.format_exc())
print('')
print('Raw data:')
print(traffic_data)
sys.exit(nagios.UNKNOWN) sys.exit(nagios.UNKNOWN)
warn_b_value = (args.bandwidth * args.bandwidth_warn / 100) * 1e+6 warn_b_value = (args.bandwidth * args.bandwidth_warn / 100) * 1e+6
@ -134,74 +129,53 @@ def main():
exit_code = nagios.OK exit_code = nagios.OK
critical = [] critical = []
warn = [] warn = []
ok = []
perf_data = [] perf_data = []
output_table = [ output_table = [
('Host', 'Interface', 'Rate In', 'Rate Out', 'Cumulative In', 'Cumulative Out', 'Connections', 'Status') ('Host', 'Interface', 'Rate In', 'Rate Out', 'Cumulative In', 'Cumulative Out', 'Connections', 'Status')
] ]
def check_b(name, value): def check_b(name, state, value):
nonlocal exit_code nonlocal exit_code
if value >= crit_b_value: if state == nagios.STATE_CRIT:
critical.append((name, filesize(value))) critical.append((name, filesize(value)))
exit_code = nagios.CRITICAL exit_code = max(nagios.CRITICAL, exit_code)
return '[CRITICAL]', nagios.CRITICAL return '[CRITICAL]', exit_code
elif value >= warn_b_value: elif state == nagios.STATE_WARN:
warn.append((name, filesize(value))) warn.append((name, filesize(value)))
exit_code = nagios.WARNING exit_code = max(nagios.STATE_WARN, exit_code)
return '[WARNING]', nagios.WARNING return '[WARNING]', exit_code
else:
ok.append((name, filesize(value)))
return '[OK]', nagios.OK
for name, data in check_result.items(): for name, data in check_result.items():
status = '[OK]' status = '[OK]'
in_status, in_rc = check_b('rate_in', data['rate_in']) in_state = get_state(data['rate_in'], warn_b_value, crit_b_value, 'ge')
if in_rc >= exit_code: in_status, exit_code = check_b(name, in_state, data['rate_in'])
status = in_status
out_status, out_rc = check_b('rate_out', data['rate_out']) out_state = get_state(data['rate_out'], warn_b_value, crit_b_value, 'ge')
if out_rc >= exit_code: in_status, exit_code = check_b(name, out_state, data['rate_out'])
status = out_status
if data['connections'] >= args.conn_critical > 0: conn_state = get_state(data['connections'], args.conn_warn, args.conn_critical, 'ge')
critical.append(('connections', data['connections'])) conn_status, exit_code = check_b(name, conn_state, data['connections'])
exit_code = nagios.CRITICAL
status = '[CRITICAL]'
elif data['connections'] >= args.conn_warn > 0:
warn.append(('connections', data['connections']))
exit_code = nagios.WARNING
status = '[WARNING]'
else:
ok.append(('connections', data['connections']))
perf_data.append(f'\'{name}_rate_in\'={int(data["rate_in"])}B;{warn_b_value};{crit_b_value};0;') perf_data[f'{name}_rate_in'] = {'value': int(data["rate_in"]), 'warn': warn_b_value, 'crit': crit_b_value, 'unit': 'B'}
perf_data.append(f'\'{name}_rate_out\'={int(data["rate_out"])}B;{warn_b_value};{crit_b_value};0;') perf_data[f'{name}_rate_out'] = {'value': int(data["rate_out"]), 'warn': warn_b_value, 'crit': crit_b_value, 'unit': 'B'}
perf_data.append(f'\'{name}_cumulative_in\'={int(data["cumulative_in"])}B;{warn_b_value};{crit_b_value};0;') perf_data[f'{name}_cumulative_in'] = {'value': int(data["cumulative_in"]), 'warn': warn_b_value, 'crit': crit_b_value, 'unit': 'B'}
perf_data.append(f'\'{name}_cumulative_out\'={int(data["cumulative_out"])}B;{warn_b_value};{crit_b_value};0;') perf_data[f'{name}_cumulative_out'] = {'value': int(data["cumulative_out"]), 'warn': warn_b_value, 'crit': crit_b_value, 'unit': 'B'}
perf_data.append(f'\'{name}_connections\'={int(data["connections"])}B;{warn_b_value};{crit_b_value};0;') perf_data[f'{name}_connections'] = {'value': int(data["connections"]), 'warn': args.conn_warn, 'crit': args.conn_critical, 'unit': 'B'}
output_table.append((args.host, name, filesize(data['rate_in']), filesize(data['rate_out']), output_table.append((args.host, name, filesize(data['rate_in']), filesize(data['rate_out']),
filesize(data['cumulative_in']), filesize(data['cumulative_out']), data['connections'], filesize(data['cumulative_in']), filesize(data['cumulative_out']), data['connections'],
status)) status))
if len(critical): if exit_code == nagios.STATE_OK:
x = ['CRITICAL: '] text_result = f'bandwidth is below {args.bandwidth} Mbps.'
for i in critical: else:
x.append(f'{i[0]}: {i[1]}, ') text_result = ', '.join([*critical, *warn])
print(''.join(x).strip(', ')) if len(check_result) > 1:
if len(warn): text_result += list_to_markdown_table(output_table, align='left', seperator='!', borders=False)
x = ['WARN: ']
for i in warn:
x.append(f'{i[0]}: {i[1]}')
print(''.join(x).strip(', '))
if not len(warn) and not len(critical):
print(f'OK: bandwidth is below {args.bandwidth} Mbps.')
print(list_to_markdown_table(output_table, align='left', seperator='!', borders=False)) print_icinga2_check_status(text_result, exit_code, perf_data)
print(f'| {" ".join(perf_data)}')
sys.exit(exit_code) sys.exit(exit_code)
@ -209,6 +183,5 @@ if __name__ == "__main__":
try: try:
main() main()
except Exception as e: except Exception as e:
print(f'UNKNOWN: exception "{e}"') print_icinga2_check_status(f'exception "{e}"\n{traceback.format_exc()}', nagios.STATE_UNKNOWN)
print(traceback.format_exc())
sys.exit(nagios.UNKNOWN) sys.exit(nagios.UNKNOWN)

View File

@ -7,6 +7,8 @@ import warnings
from cloudflarepycli import cloudflareclass from cloudflarepycli import cloudflareclass
import checker.nagios as nagios import checker.nagios as nagios
from checker.linuxfabric.base import get_state
from checker import print_icinga2_check_status
def main(): def main():
@ -34,36 +36,43 @@ def main():
warnings.simplefilter("ignore", category=RuntimeWarning) warnings.simplefilter("ignore", category=RuntimeWarning)
speedtest_results = cloudflareclass.cloudflare(printit=False).runalltests() speedtest_results = cloudflareclass.cloudflare(printit=False).runalltests()
out_str = f"upload: {speedtest_results['90th_percentile_upload_speed']['value']} Mbps, download: {speedtest_results['90th_percentile_download_speed']['value']} Mbps, latency: {speedtest_results['latency_ms']['value']} ms, jitter: {speedtest_results['Jitter_ms']['value']} ms" upload_speed_state = get_state(speedtest_results['90th_percentile_upload_speed']['value'], args.warn_up, args.critical_up, _operator='le')
perf_data = f"'upload'={speedtest_results['90th_percentile_upload_speed']['value'] * 1e+6}B;{args.warn_up * 1e+6};{args.critical_up * 1e+6};0; 'download'={speedtest_results['90th_percentile_download_speed']['value'] * 1e+6}B;{args.warn_down * 1e+6};{args.critical_down * 1e+6};0; 'latency_ms'={speedtest_results['latency_ms']['value']}ms;{args.warn_latency};{args.critical_latency};0; 'jitter_ms'={speedtest_results['Jitter_ms']['value']}ms;;;0;" download_speed_state = get_state(speedtest_results['90th_percentile_download_speed']['value'], args.warn_down, args.critical_down, _operator='le')
latency_state = get_state(speedtest_results['latency_ms']['value'], args.warn_latency, args.critical_latency, _operator='ge')
exit_code = max(upload_speed_state, download_speed_state, latency_state)
text_result = f"upload: {speedtest_results['90th_percentile_upload_speed']['value']} Mbps, download: {speedtest_results['90th_percentile_download_speed']['value']} Mbps, latency: {speedtest_results['latency_ms']['value']} ms, jitter: {speedtest_results['Jitter_ms']['value']} ms"
exit_code = nagios.OK perfdata = {
'upload': {
'value': speedtest_results['90th_percentile_upload_speed']['value'] * 1e+6,
'warn': args.warn_up * 1e+6,
'crit': args.critical_up * 1e+6,
'min': 0,
'unit': 'B'
},
'download': {
'value': speedtest_results['90th_percentile_download_speed']['value'] * 1e+6,
'warn': args.warn_down * 1e+6,
'crit': args.critical_down * 1e+6,
'min': 0,
'unit': 'B'
},
'latency_ms': {
'value': speedtest_results['latency_ms']['value'],
'warn': args.warn_latency,
'crit': args.critical_latency,
'min': 0,
'unit': 'ms'
},
'jitter_ms': {
'value': speedtest_results['Jitter_ms']['value'],
'min': 0,
'unit': 'ms'
}
if speedtest_results['90th_percentile_upload_speed']['value'] <= args.critical_up and exit_code < nagios.CRITICAL: }
exit_code = nagios.CRITICAL
elif speedtest_results['90th_percentile_upload_speed']['value'] <= args.warn_up and exit_code < nagios.WARNING:
exit_code = nagios.WARNING
if speedtest_results['90th_percentile_download_speed']['value'] <= args.critical_down and exit_code < nagios.CRITICAL: print_icinga2_check_status(text_result, exit_code, perfdata)
exit_code = nagios.CRITICAL
elif speedtest_results['90th_percentile_download_speed']['value'] <= args.warn_down and exit_code < nagios.WARNING:
exit_code = nagios.WARNING
if speedtest_results['latency_ms']['value'] >= args.warn_latency and exit_code < nagios.CRITICAL:
exit_code = nagios.CRITICAL
elif speedtest_results['latency_ms']['value'] >= args.warn_latency and exit_code < nagios.WARNING:
exit_code = nagios.WARNING
if exit_code == nagios.OK:
status_str = 'OK'
elif exit_code == nagios.WARNING:
status_str = 'WARN'
elif exit_code == nagios.CRITICAL:
status_str = 'CRITICAL'
else:
status_str = 'UNKNOWN'
print(f'{status_str} - {out_str} |{perf_data}')
sys.exit(exit_code) sys.exit(exit_code)

View File

@ -1 +1,2 @@
from .print import print_icinga2_check_status, dict_to_perfdata, create_description_list
from .markdown import list_to_markdown_table

30
checker/http.py Normal file
View File

@ -0,0 +1,30 @@
import sys
from time import sleep
import requests
from . import nagios
from .print import print_icinga2_check_status
def get_with_retry(url, retries=3, delay=1, **kwargs):
"""
Wrapper function for requests.get() with a retry mechanism.
:param url: URL to send the GET request
:param retries: Number of retries in case of HTTP failures (default: 3)
:param delay: Time delay between retries in seconds (default: 1)
:param kwargs: Additional keyword arguments for requests.get()
:return: Response object
"""
for i in range(retries):
try:
response = requests.get(url, **kwargs)
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
if i == retries - 1:
# raise e
print_icinga2_check_status(f'HTTP request failed after {i} retries: {url}\n{e}', nagios.STATE_UNKNOWN)
sys.exit(nagios.STATE_UNKNOWN)
sleep(delay)

624
checker/linuxfabric/base.py Normal file
View File

@ -0,0 +1,624 @@
#! /usr/bin/env python3
# -*- coding: utf-8; py-indent-offset: 4 -*-
#
# Author: Linuxfabrik GmbH, Zurich, Switzerland
# Contact: info (at) linuxfabrik (dot) ch
# https://www.linuxfabrik.ch/
# License: The Unlicense, see LICENSE file.
# https://github.com/Linuxfabrik/monitoring-plugins/blob/main/CONTRIBUTING.rst
"""Provides very common every-day functions.
"""
__author__ = 'Linuxfabrik GmbH, Zurich/Switzerland'
__version__ = '2023051201'
import collections
import numbers
import operator
import os
import sys
from traceback import format_exc # pylint: disable=C0413
from ..nagios import STATE_CRIT, STATE_OK, STATE_UNKNOWN, STATE_WARN
WINDOWS = os.name == "nt"
LINUX = sys.platform.startswith("linux")
X86_64 = sys.maxsize > 2 ** 32
def contine_or_exit(result, state=STATE_UNKNOWN):
"""Continue or Exit (CoE)
This is useful if calling complex library functions in your checks
`main()` function. Don't use this in functions.
If a more complex library function, for example `lib.url.fetch()` fails, it
returns `(False, 'the reason why I failed')`, otherwise `(True,
'this is my result'). This forces you to do some error handling.
To keep things simple, use `result = lib.base.coe(lib.url.fetch(...))`.
If `fetch()` fails, your plugin will exit with STATE_UNKNOWN (default) and
print the original error message. Otherwise your script just goes on.
The use case in `main()` - without `coe`:
>>> success, html = lib.url.fetch(URL)
>>> if not success:
>>> print(html) # contains the error message here
>>>> exit(STATE_UNKNOWN)
Or simply:
>>> html = lib.base.coe(lib.url.fetch(URL))
Parameters
----------
result : tuple
The result from a function call.
result[0] = expects the function return code (True on success)
result[1] = expects the function result (could be of any type)
state : int
If result[0] is False, exit with this state.
Default: 3 (which is STATE_UNKNOWN)
Returns
-------
any type
The result of the inner function call (result[1]).
"""
if result[0]:
# success
return result[1]
print(result[1])
sys.exit(state)
def see_you():
"""See you (cu)
Prints a Stacktrace (replacing "<" and ">" to be printable in Web-GUIs), and exits with
STATE_UNKNOWN.
"""
print(format_exc().replace("<", "'").replace(">", "'"))
sys.exit(STATE_UNKNOWN)
def get_perfdata(label, value, uom=None, warn=None, crit=None, _min=None, _max=None):
"""Returns 'label'=value[UOM];[warn];[crit];[min];[max]
"""
msg = "'{}'={}".format(label, value)
if uom is not None:
msg += uom
msg += ';'
if warn is not None:
msg += str(warn)
msg += ';'
if crit is not None:
msg += str(crit)
msg += ';'
if _min is not None:
msg += str(_min)
msg += ';'
if _max is not None:
msg += str(_max)
msg += ' '
return msg
def get_state(value, warn, crit, _operator='ge'):
"""Returns the STATE by comparing `value` to the given thresholds using
a comparison `_operator`. `warn` and `crit` threshold may also be `None`.
>>> get_state(15, 10, 20, 'ge')
1 (STATE_WARN)
>>> get_state(10, 10, 20, 'gt')
0 (STATE_OK)
Parameters
----------
value : float
Numeric value
warn : float
Numeric warning threshold
crit : float
Numeric critical threshold
_operator : string
`eq` = equal to
`ge` = greater or equal
`gt` = greater than
`le` = less or equal
`lt` = less than
`ne` = not equal to
`range` = match range
Returns
-------
int
`STATE_OK`, `STATE_WARN` or `STATE_CRIT`
"""
# make sure to use float comparison
value = float(value)
if _operator == 'ge':
if crit is not None:
if value >= float(crit):
return STATE_CRIT
if warn is not None:
if value >= float(warn):
return STATE_WARN
return STATE_OK
if _operator == 'gt':
if crit is not None:
if value > float(crit):
return STATE_CRIT
if warn is not None:
if value > float(warn):
return STATE_WARN
return STATE_OK
if _operator == 'le':
if crit is not None:
if value <= float(crit):
return STATE_CRIT
if warn is not None:
if value <= float(warn):
return STATE_WARN
return STATE_OK
if _operator == 'lt':
if crit is not None:
if value < float(crit):
return STATE_CRIT
if warn is not None:
if value < float(warn):
return STATE_WARN
return STATE_OK
if _operator == 'eq':
if crit is not None:
if value == float(crit):
return STATE_CRIT
if warn is not None:
if value == float(warn):
return STATE_WARN
return STATE_OK
if _operator == 'ne':
if crit is not None:
if value != float(crit):
return STATE_CRIT
if warn is not None:
if value != float(warn):
return STATE_WARN
return STATE_OK
if _operator == 'range':
if crit is not None:
if not contine_or_exit(match_range(value, crit)):
return STATE_CRIT
if warn is not None:
if not contine_or_exit(match_range(value, warn)):
return STATE_WARN
return STATE_OK
return STATE_UNKNOWN
def get_table(data, cols, header=None, strip=True, sort_by_key=None, sort_order_reverse=False):
"""Takes a list of dictionaries, formats the data, and returns
the formatted data as a text table.
Required Parameters:
data - Data to process (list of dictionaries). (Type: List)
cols - List of cols in the dictionary. (Type: List)
Optional Parameters:
header - The table header. (Type: List)
strip - Strip/Trim values or not. (Type: Boolean)
sort_by_key - The key to sort by. (Type: String)
sort_order_reverse - Default sort order is ascending, if
True sort order will change to descending. (Type: bool)
Inspired by
https://www.calazan.com/python-function-for-displaying-a-list-of-dictionaries-in-table-format/
"""
if not data:
return ''
# Sort the data if a sort key is specified (default sort order is ascending)
if sort_by_key:
data = sorted(data,
key=operator.itemgetter(sort_by_key),
reverse=sort_order_reverse)
# If header is not empty, create a list of dictionary from the cols and the header and
# insert it before first row of data
if header:
header = dict(zip(cols, header))
data.insert(0, header)
# prepare data: decode from (mostly) UTF-8 to Unicode, optionally strip values and get
# the maximum length per column
column_widths = collections.OrderedDict()
for idx, row in enumerate(data):
for col in cols:
try:
if strip:
data[idx][col] = str(row[col]).strip()
else:
data[idx][col] = str(row[col])
except:
return 'Unknown column "{}"'.format(col)
# get the maximum length
try:
column_widths[col] = max(column_widths[col], len(data[idx][col]))
except:
column_widths[col] = len(data[idx][col])
if header:
# Get the length of each column and create a '---' divider based on that length
header_divider = []
for col, width in column_widths.items():
header_divider.append('-' * width)
# Insert the header divider below the header row
header_divider = dict(zip(cols, header_divider))
data.insert(1, header_divider)
# create the output
table = ''
cnt = 0
for row in data:
tmp = ''
for col, width in column_widths.items():
if cnt != 1:
tmp += '{:<{}} ! '.format(row[col], width)
else:
# header row
tmp += '{:<{}}-+-'.format(row[col], width)
cnt += 1
table += tmp[:-2] + '\n'
return table
def get_worst(state1, state2):
"""Compares state1 to state2 and returns result based on the following
STATE_OK < STATE_UNKNOWN < STATE_WARNING < STATE_CRITICAL
It will prioritize any non-OK state.
Note that numerically the above does not hold.
"""
state1 = int(state1)
state2 = int(state2)
if STATE_CRIT in [state1, state2]:
return STATE_CRIT
if STATE_WARN in [state1, state2]:
return STATE_WARN
if STATE_UNKNOWN in [state1, state2]:
return STATE_UNKNOWN
return STATE_OK
def guess_type(v, consumer='python'):
"""Guess the type of a value (None, int, float or string) for different types of consumers
(Python, SQLite etc.).
For Python, use isinstance() to check for example if a number is an integer.
>>> guess_type('1')
1
>>> guess_type('1', 'sqlite')
'integer'
>>> guess_type('1.0')
1.0
>>> guess_type('1.0', 'sqlite')
'real'
>>> guess_type('abc')
'abc'
>>> guess_type('abc', 'sqlite')
'text'
>>>
>>> value_type = lib.base.guess_type(value)
>>> if isinstance(value_type, int) or isinstance(value_type, float):
>>> ...
"""
if consumer == 'python':
if v is None:
return None
try:
return int(v)
except ValueError:
try:
return float(v)
except ValueError:
return str(v)
if consumer == 'sqlite':
if v is None:
return 'string'
try:
int(v)
return 'integer'
except ValueError:
try:
float(v)
return 'real'
except ValueError:
return 'text'
def is_empty_list(l):
"""Check if a list only contains either empty elements or whitespace
"""
return all(s == '' or s.isspace() for s in l)
def is_numeric(value):
"""Return True if value is really numeric (int, float, whatever).
>>> is_numeric(+53.4)
True
>>> is_numeric('53.4')
False
"""
return isinstance(value, numbers.Number)
def lookup_lod(dicts, key, needle, default=None):
"""Search in a list of dictionaries ("lod)" for a value in a given dict key.
Return a default if not found.
>>> dicts = [
... { "name": "Tom", "age": 10 },
... { "name": "Mark", "age": 5 },
... { "name": "Pam", "age": 7 },
... { "name": "Dick", "age": 12 }
... ]
>>> lookup_lod(dicts, 'name', 'Pam')
{'name': 'Pam', 'age': 7}
>>> lookup_lod(dicts, 'name', 'Pamela')
>>>
"""
return next((item for item in dicts if item[key] == needle), None)
def match_range(value, spec):
"""Decides if `value` is inside/outside the threshold spec.
Parameters
----------
spec : str
Nagios range specification
value : int or float
Numeric value
Returns
-------
bool
`True` if `value` is inside the bounds for a non-inverted
`spec`, or outside the bounds for an inverted `spec`. Otherwise `False`.
Inspired by https://github.com/mpounsett/nagiosplugin/blob/master/nagiosplugin/range.py
"""
def parse_range(spec):
"""
Inspired by https://github.com/mpounsett/nagiosplugin/blob/master/nagiosplugin/range.py
+--------+-------------------+-------------------+--------------------------------+
| -w, -c | OK if result is | WARN/CRIT if | lib.base.parse_range() returns |
+--------+-------------------+-------------------+--------------------------------+
| 10 | in (0..10) | not in (0..10) | (0, 10, False) |
+--------+-------------------+-------------------+--------------------------------+
| -10 | in (-10..0) | not in (-10..0) | (0, -10, False) |
+--------+-------------------+-------------------+--------------------------------+
| 10: | in (10..inf) | not in (10..inf) | (10, inf, False) |
+--------+-------------------+-------------------+--------------------------------+
| : | in (0..inf) | not in (0..inf) | (0, inf, False) |
+--------+-------------------+-------------------+--------------------------------+
| ~:10 | in (-inf..10) | not in (-inf..10) | (-inf, 10, False) |
+--------+-------------------+-------------------+--------------------------------+
| 10:20 | in (10..20) | not in (10..20) | (10, 20, False) |
+--------+-------------------+-------------------+--------------------------------+
| @10:20 | not in (10..20) | in 10..20 | (10, 20, True) |
+--------+-------------------+-------------------+--------------------------------+
| @~:20 | not in (-inf..20) | in (-inf..20) | (-inf, 20, True) |
+--------+-------------------+-------------------+--------------------------------+
| @ | not in (0..inf) | in (0..inf) | (0, inf, True) |
+--------+-------------------+-------------------+--------------------------------+
"""
def parse_atom(atom, default):
if atom == '':
return default
if '.' in atom:
return float(atom)
return int(atom)
if spec is None or str(spec).lower() == 'none':
return (True, None)
if not isinstance(spec, str):
spec = str(spec)
invert = False
if spec.startswith('@'):
invert = True
spec = spec[1:]
if ':' in spec:
try:
start, end = spec.split(':')
except:
return (False, 'Not using range definition correctly')
else:
start, end = '', spec
if start == '~':
start = float('-inf')
else:
start = parse_atom(start, 0)
end = parse_atom(end, float('inf'))
if start > end:
return (False, 'Start %s must not be greater than end %s' % (start, end))
return (True, (start, end, invert))
if spec is None or str(spec).lower() == 'none':
return (True, True)
success, result = parse_range(spec)
if not success:
return (success, result)
start, end, invert = result
if isinstance(value, (str, bytes)):
value = float(value.replace('%', ''))
if value < start:
return (True, False ^ invert)
if value > end:
return (True, False ^ invert)
return (True, True ^ invert)
def over_and_out(msg, state=STATE_OK, perfdata='', always_ok=False):
"""Over and Out (OaO)
Print the stripped plugin message. If perfdata is given, attach it
by `|` and print it stripped. Exit with `state`, or with STATE_OK (0) if
`always_ok` is set to `True`.
"""
if perfdata:
print(msg.strip() + '|' + perfdata.strip())
else:
print(msg.strip())
if always_ok:
sys.exit(STATE_OK)
sys.exit(state)
def smartcast(value):
"""Returns the value converted to float if possible, else string, else the
uncasted value.
"""
for test in [float, str]:
try:
return test(value)
except ValueError:
continue
# No match
return value
def sort(array, reverse=True, sort_by_key=False):
"""Sort a simple 1-dimensional dictionary
"""
if isinstance(array, dict):
if not sort_by_key:
return sorted(array.items(), key=lambda x: x[1], reverse=reverse)
return sorted(array.items(), key=lambda x: str(x[0]).lower(), reverse=reverse)
return array
def state2str(state, empty_ok=True, prefix='', suffix=''):
"""Return the state's string representation.
The square brackets around the state cause icingaweb2 to color the state.
>> lib.base.state2str(2)
'[CRIT]'
>>> state2str(0)
''
>>> state2str(0, empty_ok=False)
'[OK]'
>>> state2str(0, empty_ok=False, suffix=' ')
'[OK] '
>>> state2str(0, empty_ok=False, prefix=' (', suffix=')')
' ([OK])'
"""
state = int(state)
if state == STATE_OK and empty_ok:
return ''
if state == STATE_OK and not empty_ok:
return '{}[OK]{}'.format(prefix, suffix)
if state == STATE_WARN:
return '{}[WARNING]{}'.format(prefix, suffix)
if state == STATE_CRIT:
return '{}[CRITICAL]{}'.format(prefix, suffix)
if state == STATE_UNKNOWN:
return '{}[UNKNOWN]{}'.format(prefix, suffix)
return state
def str2state(string, ignore_error=True):
"""Return the numeric state based on a (case-insensitive) string.
Matches up to the first four characters.
>>> str2state('ok')
0
>>> str2state('okidoki')
3
>>> str2state('okidoki', ignore_error=False)
None
>>> str2state('war')
3
>>> str2state('warn')
1
>>> str2state('Warnung')
1
>>> str2state('CrITical')
2
>>> str2state('UNKNOWN')
3
>>> str2state('gobbledygook')
3
>>> str2state('gobbledygook', ignore_error=False)
None
"""
string = str(string).lower()[0:4]
if string == 'ok':
return STATE_OK
if string == 'warn':
return STATE_WARN
if string == 'crit':
return STATE_CRIT
if string == 'unkn':
return STATE_UNKNOWN
if ignore_error:
return STATE_UNKNOWN
return None
def sum_dict(dict1, dict2):
"""Sum up two dictionaries, maybe with different keys.
>>> sum_dict({'in': 100, 'out': 10}, {'in': 50, 'error': 5, 'uuid': '1234-xyz'})
{'in': 150, 'error': 5, 'out': 10}
"""
total = {}
for key, value in dict1.items():
if not is_numeric(value):
continue
if key in total:
total[key] += value
else:
total[key] = value
for key, value in dict2.items():
if not is_numeric(value):
continue
if key in total:
total[key] += value
else:
total[key] = value
return total
def sum_lod(mylist):
"""Sum up a list of (simple 1-dimensional) dictionary items.
sum_lod([{'in': 100, 'out': 10}, {'in': 50, 'out': 20}, {'error': 5, 'uuid': '1234-xyz'}])
>>> {'in': 150, 'out': 30, 'error': 5}
"""
total = {}
for mydict in mylist:
for key, value in mydict.items():
if not is_numeric(value):
continue
if key in total:
total[key] += value
else:
total[key] = value
return total

View File

@ -0,0 +1,222 @@
#! /usr/bin/env python3
# -*- coding: utf-8; py-indent-offset: 4 -*-
#
# Author: Linuxfabrik GmbH, Zurich, Switzerland
# Contact: info (at) linuxfabrik (dot) ch
# https://www.linuxfabrik.ch/
# License: The Unlicense, see LICENSE file.
# https://github.com/Linuxfabrik/monitoring-plugins/blob/main/CONTRIBUTING.rst
"""Functions to convert raw numbers, times etc. to a human readable representation (and sometimes
back).
"""
__author__ = 'Linuxfabrik GmbH, Zurich/Switzerland'
__version__ = '2023051201'
import math
def bits2human(n, _format='%(value).1f%(symbol)s'):
"""Converts n bits to a human readable format.
>>> bits2human(8191)
'1023.9B'
>>> bits2human(8192)
'1.0KiB'
"""
symbols = ('B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB')
prefix = {}
prefix['B'] = 8
for i, s in enumerate(symbols[1:]):
prefix[s] = 1024 ** (i + 1) * 8
for symbol in reversed(symbols):
if n >= prefix[symbol]:
value = float(n) / prefix[symbol] # pylint: disable=W0641
return _format % locals()
return _format % dict(symbol=symbols[0], value=n)
def bps2human(n, _format='%(value).1f%(symbol)s'):
"""Converts n bits per scond to a human readable format.
>>> bps2human(72000000)
'72Mbps'
"""
symbols = ('bps', 'Kbps', 'Mbps', 'Gbps', 'Tbps', 'Pbps', 'Ebps', 'Zbps', 'Ybps')
prefix = {}
for i, s in enumerate(symbols[1:]):
prefix[s] = 1000 ** (i + 1)
for symbol in reversed(symbols[1:]):
if n >= prefix[symbol]:
value = float(n) / prefix[symbol] # pylint: disable=W0641
return _format % locals()
return _format % dict(symbol=symbols[0], value=n)
def bytes2human(n, _format='%(value).1f%(symbol)s'):
"""Converts n bytes to a human readable format.
>>> bytes2human(1023)
'1023.0B'
>>> bytes2human(1024)
'1.0KiB'
https://github.com/giampaolo/psutil/blob/master/psutil/_common.py
"""
symbols = ('B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB')
prefix = {}
for i, s in enumerate(symbols[1:]):
# Returns 1 with the bits shifted to the left by (i + 1)*10 places
# (and new bits on the right-hand-side are zeros). This is the same
# as multiplying x by 2**y.
prefix[s] = 1 << (i + 1) * 10
for symbol in reversed(symbols[1:]):
if n >= prefix[symbol]:
value = float(n) / prefix[symbol] # pylint: disable=W0641
return _format % locals()
return _format % dict(symbol=symbols[0], value=n)
def human2bytes(string, binary=True):
"""Converts a string such as '3.072GiB' to 3298534883 bytes. If "binary" is set to True
(default due to Microsoft), it will use powers of 1024, otherwise powers of 1000 (decimal).
Returns 0 on failure.
Works with:
* 3.072GiB (always multiplied by 1024)
* 3.072GB (multiplied by 1024 if binary == True, else multiplied by 1000)
* 3.072G (multiplied by 1024 if binary == True, else multiplied by 1000)
"""
try:
string = string.lower()
if 'kib' in string:
return int(float(string.replace('kib', '').strip()) * 1024)
if 'mib' in string:
return int(float(string.replace('mib', '').strip()) * 1024 ** 2)
if 'gib' in string:
return int(float(string.replace('gib', '').strip()) * 1024 ** 3)
if 'tib' in string:
return int(float(string.replace('tib', '').strip()) * 1024 ** 4)
if 'pib' in string:
return int(float(string.replace('pib', '').strip()) * 1024 ** 5)
if 'k' in string: # matches "kb" or "k"
string = string.replace('kb', '').replace('k', '').strip()
if binary:
return int(float(string) * 1024)
return int(float(string) * 1000)
if 'm' in string: # matches "mb" or "m"
string = string.replace('mb', '').replace('m', '').strip()
if binary:
return int(float(string) * 1024 ** 2)
return int(float(string) * 1000 ** 2)
if 'g' in string: # matches "gb" or "g"
string = string.replace('gb', '').replace('g', '').strip()
if binary:
return int(float(string) * 1024 ** 3)
return int(float(string) * 1000 ** 3)
if 't' in string: # matches "tb" or "t"
string = string.replace('tb', '').replace('t', '').strip()
if binary:
return int(float(string) * 1024 ** 4)
return int(float(string) * 1000 ** 4)
if 'p' in string: # matches "pb" or "p"
string = string.replace('pb', '').replace('p', '').strip()
if binary:
return int(float(string) * 1024 ** 5)
return int(float(string) * 1000 ** 5)
if 'b' in string:
return int(float(string.replace('b', '').strip()))
return 0
except:
return 0
def number2human(n):
"""
>>> number2human(123456.8)
'123K'
>>> number2human(123456789.0)
'123M'
>>> number2human(9223372036854775808)
'9.2E'
"""
# according to the SI Symbols at
# https://en.wikipedia.org/w/index.php?title=Names_of_large_numbers&section=5#Extensions_of_the_standard_dictionary_numbers
millnames = ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y']
try:
n = float(n)
except:
return n
millidx = max(0, min(len(millnames) - 1,
int(math.floor(0 if n == 0 else math.log10(abs(n)) / 3))))
return '{:.1f}{}'.format(n / 10 ** (3 * millidx), millnames[millidx])
def seconds2human(seconds, keep_short=True, full_name=False):
"""Returns a human readable time range string for a number of seconds.
>>> seconds2human(0.125)
'0.12s'
>>> seconds2human(1)
'1s'
>>> seconds2human(59)
'59s'
>>> seconds2human(60)
'1m'
>>> seconds2human(61)
'1m 1s'
>>> seconds2human(1387775)
'2W 2D'
>>> seconds2human('1387775')
'2W 2D'
>>> seconds2human('1387775', full_name=True)
'2weeks 2days'
>>> seconds2human(1387775, keep_short=False, full_name=True)
'2weeks 2days 1hour 29minutes 35seconds'
"""
seconds = float(seconds)
if full_name:
symbols = (
('years', 60 * 60 * 24 * 365),
('months', 60 * 60 * 24 * 30),
('weeks', 60 * 60 * 24 * 7),
('days', 60 * 60 * 24),
('hours', 60 * 60),
('minutes', 60),
('seconds', 1),
('millisecs', 1e-3),
('microsecs', 1e-6),
('nanosecs', 1e-9),
('picosecs', 1e-12),
)
else:
symbols = (
('Y', 60 * 60 * 24 * 365),
('M', 60 * 60 * 24 * 30),
('W', 60 * 60 * 24 * 7),
('D', 60 * 60 * 24),
('h', 60 * 60),
('m', 60),
('s', 1),
('ms', 1e-3),
('us', 1e-6),
('ns', 1e-9),
('ps', 1e-12),
)
result = []
for name, count in symbols:
value = seconds // count
if value:
seconds -= value * count
if full_name and value == 1:
name = name.rstrip('s') # "days" becomes "day"
result.append('{:.0f}{}'.format(value, name))
if len(result) > 2 and keep_short:
return ' '.join(result[:2])
return ' '.join(result)

View File

@ -1,4 +1,5 @@
UNKNOWN = -1 # TODO: remove non STATE_ vars
OK = 0 UNKNOWN = STATE_UNKNOWN = -1
WARNING = 1 OK = STATE_OK = 0
CRITICAL = 2 WARNING = STATE_WARN = 1
CRITICAL = STATE_CRIT = 2

109
checker/print.py Normal file
View File

@ -0,0 +1,109 @@
def create_description_list(data: list):
"""
Create a Description List HTML element based on the input list.
Args:
data (list): A list containing lists, where each inner list has two elements:
the description term and the description details.
Returns:
str: A string containing the HTML representation of the description list.
Example:
data = [
["Description Term 1", "Description Details 1"],
["Description Term 2", "Description Details 2"],
["Description Term 3", "Description Details 3"],
]
html = create_description_list(data)
print(html)
"""
html = "<dl>"
for item in data:
html += f"<dt>{item[0]}</dt><dd>{item[1]}</dd>"
html += "</dl>"
return html
def dict_to_perfdata(data: dict, replace_dashes=True):
"""
Transforms a dictionary into Icinga2 perfdata format.
The input dictionary should have the following structure:
{
"item1": {
"value": any_type (required),
"warn": any_type (optional),
"crit": any_type (optional),
"min": any_type (optional),
"max": any_type (optional),
"unit": string (optional),
},
...
}
:param data: A dictionary containing the perfdata items and their values.
:type data: dict
:param replace_dashes: If True, replace dashes in perfdata names/keys with underscores. Default is True.
:type replace_dashes: bool
:return: A string in Icinga2 perfdata format.
:rtype: str
"""
perfdata = []
for key, values in data.copy().items():
# Remove values that are None
for k, v in values.copy().items():
if v is None:
del values[k]
if replace_dashes:
key = key.replace('-', '_')
item = f"{key.strip()}={values['value']}{values.get('unit', '')}"
for k in ['warn', 'crit', 'min', 'max']:
if k in values:
item += f";{values[k]}"
else:
item += ";"
perfdata.append(item)
return " ".join(perfdata)
def print_icinga2_check_status(text_result: str, return_code: int, perfdata=None):
"""
Prints the status of an Icinga2 check with the given text result, return code, and perfdata.
Example:
sample_text_result = "check is good, time 120s"
sample_return_code = 0
sample_perfdata = {
"time": {
"value": 120,
"warn": 3,
"crit": 100,
"min": 0,
"max": 200,
},
}
print_icinga2_check_status(sample_text_result, sample_return_code, sample_perfdata)
:param text_result: The text result of the check command.
:type text_result: str
:param return_code: The return code of the check (UNKNOWN=-1, OK=0, WARNING=1, CRITICAL=2).
:type return_code: int
:param perfdata: A dictionary containing the perfdata items and their values, defaults to an empty dictionary.
:type perfdata: dict, optional
:raises ValueError: If an invalid return code is passed.
"""
if perfdata is None:
perfdata = {}
status_codes = {
-1: "UNKNOWN",
0: "OK",
1: "WARNING",
2: "CRITICAL",
}
if return_code not in status_codes:
raise ValueError(f"Invalid return code: {return_code}")
status = status_codes[return_code]
perfdata_str = f' | {dict_to_perfdata(perfdata)}' if perfdata else ''
print(f"{status} - {text_result.strip()}{perfdata_str}")