2023-11-22 23:35:33 -07:00
import argparse
import json
import logging
import time
import requests
from urllib3 . exceptions import InsecureRequestWarning
requests . packages . urllib3 . disable_warnings ( category = InsecureRequestWarning )
def main ( args ) :
# Icinga2 API URL
url = f " { args . api } /v1 "
logging . basicConfig ( level = logging . INFO )
while True :
current_time = time . time ( )
# Get all checks
2023-11-22 23:51:23 -07:00
objects = [ " hosts " , " services " ]
2024-03-03 23:07:45 -07:00
for obj in objects :
for i in range ( 3 ) :
try :
response = requests . get ( url + " /objects/ " + obj , auth = ( args . username , args . password ) , verify = not args . insecure )
response . raise_for_status ( )
break
except requests . exceptions . ConnectionError as e :
logging . error ( f ' Failed to reach Icinga2: { e } ' )
logging . error ( ' Sleeping 60 seconds ' )
time . sleep ( 60 )
2023-11-22 23:51:23 -07:00
checks = response . json ( ) [ " results " ]
# Loop through all checks
for check in checks :
2023-11-22 23:58:53 -07:00
if not check [ ' attrs ' ] [ ' active ' ] :
continue
2023-11-22 23:51:23 -07:00
last_check_time = check [ " attrs " ] [ " last_check " ]
check_interval = check [ " attrs " ] [ " check_interval " ]
check_timeout = check [ " attrs " ] [ " check_timeout " ] or args . default_timeout
# If the check is overdue.
2023-11-23 00:01:15 -07:00
if current_time - last_check_time > check_interval + check_timeout + 60 :
2023-11-22 23:51:23 -07:00
headers = { ' Accept ' : ' application/json ' , ' Content-Type ' : ' application/json ' }
next_check_time = last_check_time + check_interval + check_timeout
2024-03-03 23:07:45 -07:00
if obj == ' services ' :
2023-11-22 23:51:23 -07:00
check_filter = f ' host.name== " { check [ " attrs " ] [ " host_name " ] } " && service.name== " { check [ " attrs " ] [ " name " ] } " '
2024-03-03 23:07:45 -07:00
elif obj == ' hosts ' :
2023-11-22 23:51:23 -07:00
check_filter = f ' host.name== " { check [ " name " ] } " '
else :
raise Exception
# Set the check to unknown
data = {
" type " : check [ ' type ' ] ,
" filter " : check_filter ,
" exit_status " : 3 if check [ ' type ' ] == ' Service ' else 1 ,
" plugin_output " : f " <Check is overdue for { int ( current_time - next_check_time ) } seconds.> " ,
}
# Trigger a full failure.
for _ in range ( 4 ) :
response = requests . post ( url + " /actions/process-check-result " , data = json . dumps ( data ) , headers = headers , auth = ( args . username , args . password ) , verify = not args . insecure )
response . raise_for_status ( )
time . sleep ( 3 )
# Rerun the check
data = {
" type " : check [ ' type ' ] ,
" filter " : check_filter ,
2023-11-22 23:54:18 -07:00
' force ' : True
2023-11-22 23:51:23 -07:00
}
response = requests . post ( url + " /actions/reschedule-check " , data = json . dumps ( data ) , headers = headers , auth = ( args . username , args . password ) , verify = not args . insecure )
2023-11-22 23:35:33 -07:00
response . raise_for_status ( )
2023-11-22 23:51:23 -07:00
logging . info ( f ' Failed { check [ " type " ] . lower ( ) } { check [ " name " ] } - { int ( current_time - next_check_time ) } seconds overdue. ' )
2023-11-22 23:35:33 -07:00
time . sleep ( args . interval )
if __name__ == " __main__ " :
parser = argparse . ArgumentParser ( )
2024-02-25 21:08:32 -07:00
parser . add_argument ( ' --api ' , default = ' https://localhost:5665 ' , help = ' Base Icinga2 API. ' )
2023-11-22 23:35:33 -07:00
parser . add_argument ( ' --insecure ' , action = ' store_true ' , help = ' Disable SSL verification. ' )
parser . add_argument ( ' --username ' , default = ' icingaweb2 ' , help = ' API username. ' )
parser . add_argument ( ' --password ' , required = True , help = ' API password. ' )
parser . add_argument ( ' --default-timeout ' , default = 600 , type = int , help = ' If a check does not have a timeout set, use this many seconds as the default. Default: 600 (10 minutes). ' )
parser . add_argument ( ' --interval ' , default = 900 , type = int , help = ' Interval between service scans. Default: 900 (15 minutes). ' )
args = parser . parse_args ( )
main ( args )