#!/usr/bin/env python3 import argparse import re import subprocess import sys import traceback from datetime import datetime, timedelta from typing import Tuple, Union import humanfriendly from dateparser import parse from dateutil import tz from pydantic import BaseModel from checker import nagios from checker.result import quit_check sys.path.insert(0, "/usr/lib/python3/dist-packages") import dbus SYSTEMCTL_STATUS_RE = r'Loaded:\s.*\.timer;\s(.*?);.*?\)\s*Active:\s(.*?) since.*?\s*Trigger:\s(.*?;|n\/a)' class TimerInfo(BaseModel): next: Union[datetime, None] time_left: Union[timedelta, None] last: Union[datetime, None] since_last: Union[timedelta, None] unit: str enabled: bool active: bool status: Union[str, None] def get_last_trigger(timer_name: str): output = subprocess.check_output("systemctl list-timers --all", shell=True).decode('utf-8') lines = output.strip().split("\n") for line in lines[1:]: fields = line.split() if timer_name in fields: if len(fields) < 14: # Timer has not been run yet. return None try: return parse(fields[7] + ' ' + fields[8] + ' ' + fields[9] + ' ' + fields[10]) except IndexError: print(fields) raise def get_next_elapse(timer_name: str) -> Tuple[TimerInfo | None, None | str]: now = datetime.now() try: output = subprocess.check_output(["systemctl", "status", timer_name], universal_newlines=True) if timer_name in output.split('\n')[0]: try: parts = re.search(SYSTEMCTL_STATUS_RE, output) next_trigger_str = parts.group(3) if next_trigger_str.lower() != 'n/a': next_trigger = parse(next_trigger_str) time_left = next_trigger - now.replace(tzinfo=next_trigger.tzinfo) else: next_trigger = None time_left = None last_trigger = get_last_trigger(timer_name) if last_trigger is not None: since_last = now.replace(tzinfo=last_trigger.tzinfo) - last_trigger else: since_last = None return TimerInfo( next=next_trigger, time_left=time_left, last=last_trigger, since_last=since_last, enabled=parts.group(1).lower() == 'enabled', active=parts.group(2).split(' ')[0].lower() == 'active', status=parts.group(2).split(' ')[-1].lower().strip('(').strip(')') if '(' in parts.group(2).split(' ')[-1] else None, unit=timer_name, ), None except Exception: print(output) traceback.print_exc() sys.exit(nagios.STATE_UNKNOWN) return None, 'Timer not found' except subprocess.CalledProcessError as e: return None, f'systemctl status failed: {e}' def check_timer(timer_name: str, expected_interval: int = None): if not timer_name.endswith('.timer'): timer_name = timer_name + '.timer' try: system_bus = dbus.SystemBus() systemd1 = system_bus.get_object('org.freedesktop.systemd1', '/org/freedesktop/systemd1') manager = dbus.Interface(systemd1, 'org.freedesktop.systemd1.Manager') timer_unit_path = manager.GetUnit(timer_name) timer_unit = system_bus.get_object('org.freedesktop.systemd1', timer_unit_path) timer_properties = dbus.Interface(timer_unit, 'org.freedesktop.DBus.Properties') active_state = timer_properties.Get('org.freedesktop.systemd1.Unit', 'ActiveState') if active_state == 'active': next_elapse, err = get_next_elapse(timer_name) if err: quit_check(f'{err}', nagios.STATE_UNKNOWN) # if (next_elapse.left is not None and next_elapse.passed is not None) and (next_elapse.left < 0 or next_elapse.passed < 0): # quit_check(f'Timer is negative??? Left: {next_elapse["left"]}. Passed: {next_elapse["passed"]}', nagios.STATE_UNKNOWN) next_elapse_human = next_elapse.next.replace(tzinfo=tz.tzlocal()).strftime('%a %Y-%m-%d %H:%M %Z') if next_elapse.next else 'N/A' remaining_time_human = humanfriendly.format_timespan(next_elapse.time_left) if next_elapse.time_left else 'N/A' since_last_human = humanfriendly.format_timespan(next_elapse.since_last) if next_elapse.since_last else 'N/A' perfdata_dict = { 'remaining_time': { 'value': next_elapse.time_left.seconds if next_elapse.time_left else 0, 'unit': 's', 'min': 0 }, 'time_since_last': { 'value': next_elapse.since_last.seconds if next_elapse.since_last else 0, 'unit': 's', 'min': 0 } } timer_info = f'Next trigger time: {next_elapse_human}. Time until next trigger: {remaining_time_human}. Time since last trigger: {since_last_human}.' if expected_interval is not None and next_elapse.next and next_elapse.last: next_trigger_time = next_elapse.next actual_interval = next_trigger_time - next_elapse.last actual_interval_seconds = actual_interval.total_seconds() if actual_interval_seconds > expected_interval: quit_check(f'{timer_name} is active but the last trigger was more than the expected interval ago. {timer_info}', nagios.STATE_CRIT, perfdata_dict) quit_check(f'{timer_name}{" is active" + (" (" + next_elapse.status + ")" if next_elapse else "") + "." if next_elapse.active else " -> "} {timer_info}', nagios.STATE_OK, perfdata_dict) else: quit_check(f'{timer_name} is not enabled', nagios.STATE_CRIT) except dbus.exceptions.DBusException: quit_check(f'{timer_name} does not exist or is disabled', nagios.STATE_CRIT) if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('-t', '--timer', required=True, help='The name of the timer to check.') parser.add_argument('-i', '--interval', type=int, help='The expected interval between timer triggers in seconds.') args = parser.parse_args() try: check_timer(args.timer, args.interval) except Exception as e: print(f'UNKNOWN - exception "{e}"') traceback.print_exc() sys.exit(nagios.STATE_UNKNOWN)