icinga2-checks/check_speedtest.py

86 lines
3.6 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import sys
import traceback
import warnings
from cloudflarepycli import cloudflareclass
import checker.nagios as nagios
from checker.linuxfabric.base import get_state
from checker import print_icinga2_check_status
def main():
"""
Copied from __main__.py
https://github.com/tevslin/cloudflarepycli/blob/main/src/cloudflarepycli/__main__.py
"""
parser = argparse.ArgumentParser(description='Check internet speed.')
parser.add_argument('--warn-up', type=int, default=0, help='Warn level of upload speed in Mbps.')
parser.add_argument('--critical-up', type=int, default=0, help='Critical level of upload speed in Mbps.')
parser.add_argument('--warn-down', type=int, default=0, help='Warn level of download speed in Mbps.')
parser.add_argument('--critical-down', type=int, default=0, help='Critical level of download speed in Mbps.')
parser.add_argument('--warn-latency', type=int, default=60, help='Warn level of latency in ms.')
parser.add_argument('--critical-latency', type=int, default=100, help='Critical level of latency in ms.')
args = parser.parse_args()
# Silence this error:
"""
lib/python3.10/site-packages/numpy/core/fromnumeric.py:3464: RuntimeWarning: Mean of empty slice.
return _methods._mean(a, axis=axis, dtype=dtype,
lib/python3.10/site-packages/numpy/core/_methods.py:192: RuntimeWarning: invalid value encountered in scalar divide
ret = ret.dtype.type(ret / rcount)
"""
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
speedtest_results = cloudflareclass.cloudflare(printit=False).runalltests()
upload_speed_state = get_state(speedtest_results['90th_percentile_upload_speed']['value'], args.warn_up, args.critical_up, _operator='le')
download_speed_state = get_state(speedtest_results['90th_percentile_download_speed']['value'], args.warn_down, args.critical_down, _operator='le')
latency_state = get_state(speedtest_results['latency_ms']['value'], args.warn_latency, args.critical_latency, _operator='ge')
exit_code = max(upload_speed_state, download_speed_state, latency_state)
text_result = f"upload: {speedtest_results['90th_percentile_upload_speed']['value']} Mbps, download: {speedtest_results['90th_percentile_download_speed']['value']} Mbps, latency: {speedtest_results['latency_ms']['value']} ms, jitter: {speedtest_results['Jitter_ms']['value']} ms"
perfdata = {
'upload': {
'value': speedtest_results['90th_percentile_upload_speed']['value'] * 1e+6,
'warn': args.warn_up * 1e+6,
'crit': args.critical_up * 1e+6,
'min': 0,
'unit': 'B'
},
'download': {
'value': speedtest_results['90th_percentile_download_speed']['value'] * 1e+6,
'warn': args.warn_down * 1e+6,
'crit': args.critical_down * 1e+6,
'min': 0,
'unit': 'B'
},
'latency_ms': {
'value': speedtest_results['latency_ms']['value'],
'warn': args.warn_latency,
'crit': args.critical_latency,
'min': 0,
'unit': 'ms'
},
'jitter_ms': {
'value': speedtest_results['Jitter_ms']['value'],
'min': 0,
'unit': 'ms'
}
}
print_icinga2_check_status(text_result, exit_code, perfdata)
sys.exit(exit_code)
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f'UNKNOWN: exception "{e}"')
print(traceback.format_exc())
sys.exit(nagios.UNKNOWN)