icinga2-checks/check_systemd_timer.py

155 lines
6.2 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import re
import subprocess
import sys
import traceback
from datetime import datetime, timedelta
from typing import Tuple, Union
import humanfriendly
from dateparser import parse
from dateutil import tz
from pydantic import BaseModel
from checker import nagios
from checker.result import quit_check
sys.path.insert(0, "/usr/lib/python3/dist-packages")
import dbus
SYSTEMCTL_STATUS_RE = r'Loaded:\s.*\.timer;\s(.*?);.*?\)\s*Active:\s(.*?) since.*?\s*Trigger:\s(.*?);'
class TimerInfo(BaseModel):
next: datetime
time_left: timedelta
last: Union[datetime, None]
since_last: Union[timedelta, None]
unit: str
enabled: bool
active: bool
status: Union[str, None]
def get_last_trigger(timer_name: str):
output = subprocess.check_output("systemctl list-timers --all", shell=True).decode('utf-8')
lines = output.strip().split("\n")
for line in lines[1:]:
fields = line.split()
if timer_name in fields:
if len(fields) < 14:
# Timer has not been run yet.
return None
try:
return parse(fields[7] + ' ' + fields[8] + ' ' + fields[9] + ' ' + fields[10])
except IndexError:
print(fields)
raise
def get_next_elapse(timer_name: str) -> Tuple[TimerInfo | None, None | str]:
try:
output = subprocess.check_output(["systemctl", "status", timer_name], universal_newlines=True)
if timer_name in output.split('\n')[0]:
try:
parts = re.search(SYSTEMCTL_STATUS_RE, output)
next_trigger = parse(parts.group(3))
now = datetime.now(tz=next_trigger.tzinfo)
time_left = next_trigger - now
last_trigger = get_last_trigger(timer_name)
if last_trigger is not None:
since_last = last_trigger - now
else:
since_last = None
print(since_last)
return TimerInfo(
next=next_trigger,
time_left=time_left,
last=last_trigger,
since_last=since_last,
enabled=parts.group(1).lower() == 'enabled',
active=parts.group(2).split(' ')[0].lower() == 'active',
status=parts.group(2).split(' ')[-1].lower().strip('(').strip(')') if '(' in parts.group(2).split(' ')[-1] else None,
unit=timer_name,
), None
except Exception:
print(output)
traceback.print_exc()
sys.exit(nagios.STATE_UNKNOWN)
return None, 'Timer not found'
except subprocess.CalledProcessError as e:
return None, f'systemctl status failed: {e}'
def check_timer(timer_name: str, expected_interval: int = None):
if not timer_name.endswith('.timer'):
timer_name = timer_name + '.timer'
try:
system_bus = dbus.SystemBus()
systemd1 = system_bus.get_object('org.freedesktop.systemd1', '/org/freedesktop/systemd1')
manager = dbus.Interface(systemd1, 'org.freedesktop.systemd1.Manager')
timer_unit_path = manager.GetUnit(timer_name)
timer_unit = system_bus.get_object('org.freedesktop.systemd1', timer_unit_path)
timer_properties = dbus.Interface(timer_unit, 'org.freedesktop.DBus.Properties')
active_state = timer_properties.Get('org.freedesktop.systemd1.Unit', 'ActiveState')
if active_state == 'active':
next_elapse, err = get_next_elapse(timer_name)
if err:
quit_check(f'{err}', nagios.STATE_UNKNOWN)
# if (next_elapse.left is not None and next_elapse.passed is not None) and (next_elapse.left < 0 or next_elapse.passed < 0):
# quit_check(f'Timer is negative??? Left: {next_elapse["left"]}. Passed: {next_elapse["passed"]}', nagios.STATE_UNKNOWN)
next_elapse_human = next_elapse.next.replace(tzinfo=tz.tzlocal()).strftime('%a %Y-%m-%d %H:%M %Z')
remaining_time_human = humanfriendly.format_timespan(next_elapse.time_left)
since_last_human = humanfriendly.format_timespan(next_elapse.since_last) if next_elapse.since_last else 'N/A'
perfdata_dict = {
'remaining_time': {
'value': next_elapse.time_left.seconds,
'unit': 's',
'min': 0
},
'time_since_last': {
'value': next_elapse.since_last.seconds if next_elapse.since_last is not None else 0,
'unit': 's',
'min': 0
}
}
timer_info = f'Next trigger time: {next_elapse_human}. Time until next trigger: {remaining_time_human}. Time since last trigger: {since_last_human}.'
if expected_interval is not None and next_elapse.next and next_elapse.last:
next_trigger_time = next_elapse.next
actual_interval = next_trigger_time - next_elapse.last
actual_interval_seconds = actual_interval.total_seconds()
if actual_interval_seconds > expected_interval:
quit_check(f'{timer_name} is active but the last trigger was more than the expected interval ago. {timer_info}', nagios.STATE_CRIT, perfdata_dict)
quit_check(f'{timer_name}{" is active" + (" (" + next_elapse.status + ")" if next_elapse else "") + "." if next_elapse.active else " -> "} {timer_info}', nagios.STATE_OK, perfdata_dict)
else:
quit_check(f'{timer_name} is not enabled', nagios.STATE_CRIT)
except dbus.exceptions.DBusException:
quit_check(f'{timer_name} does not exist or is disabled', nagios.STATE_CRIT)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-t', '--timer', required=True, help='The name of the timer to check.')
parser.add_argument('-i', '--interval', type=int, help='The expected interval between timer triggers in seconds.')
args = parser.parse_args()
try:
check_timer(args.timer, args.interval)
except Exception as e:
print(f'UNKNOWN - exception "{e}"')
traceback.print_exc()
sys.exit(nagios.STATE_UNKNOWN)