import argparse import json import logging import time import requests from urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) def main(args): # Icinga2 API URL url = f"{args.api}/v1" logging.basicConfig(level=logging.INFO) while True: current_time = time.time() # Get all checks objects = ["hosts", "services"] for object in objects: response = requests.get(url + "/objects/" + object, auth=(args.username, args.password), verify=not args.insecure) response.raise_for_status() checks = response.json()["results"] # Loop through all checks for check in checks: if not check['attrs']['active']: continue last_check_time = check["attrs"]["last_check"] check_interval = check["attrs"]["check_interval"] check_timeout = check["attrs"]["check_timeout"] or args.default_timeout # If the check is overdue. if current_time - last_check_time > check_interval + check_timeout + 60: headers = {'Accept': 'application/json', 'Content-Type': 'application/json'} next_check_time = last_check_time + check_interval + check_timeout if object == 'services': check_filter = f'host.name=="{check["attrs"]["host_name"]}" && service.name=="{check["attrs"]["name"]}"' elif object == 'hosts': check_filter = f'host.name=="{check["name"]}"' else: raise Exception # Set the check to unknown data = { "type": check['type'], "filter": check_filter, "exit_status": 3 if check['type'] == 'Service' else 1, "plugin_output": f"", } # Trigger a full failure. for _ in range(4): response = requests.post(url + "/actions/process-check-result", data=json.dumps(data), headers=headers, auth=(args.username, args.password), verify=not args.insecure) response.raise_for_status() time.sleep(3) # Rerun the check data = { "type": check['type'], "filter": check_filter, 'force': True } response = requests.post(url + "/actions/reschedule-check", data=json.dumps(data), headers=headers, auth=(args.username, args.password), verify=not args.insecure) response.raise_for_status() logging.info(f'Failed {check["type"].lower()} {check["name"]} - {int(current_time - next_check_time)} seconds overdue.') time.sleep(args.interval) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('--api', default='https://localhost:5665', help='Base Icinga2 API.') parser.add_argument('--insecure', action='store_true', help='Disable SSL verification.') parser.add_argument('--username', default='icingaweb2', help='API username.') parser.add_argument('--password', required=True, help='API password.') parser.add_argument('--default-timeout', default=600, type=int, help='If a check does not have a timeout set, use this many seconds as the default. Default: 600 (10 minutes).') parser.add_argument('--interval', default=900, type=int, help='Interval between service scans. Default: 900 (15 minutes).') args = parser.parse_args() main(args)