153 lines
6.2 KiB
Python
Executable File
153 lines
6.2 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import argparse
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import traceback
|
|
from datetime import datetime, timedelta
|
|
from typing import Tuple, Union
|
|
|
|
import humanfriendly
|
|
from dateparser import parse
|
|
from dateutil import tz
|
|
from pydantic import BaseModel
|
|
|
|
from checker import nagios
|
|
from checker.result import quit_check
|
|
|
|
sys.path.insert(0, "/usr/lib/python3/dist-packages")
|
|
import dbus
|
|
|
|
SYSTEMCTL_STATUS_RE = r'Loaded:\s.*\.timer;\s(.*?);.*?\)\s*Active:\s(.*?) since.*?\s*Trigger:\s(.*?);'
|
|
|
|
|
|
class TimerInfo(BaseModel):
|
|
next: datetime
|
|
time_left: timedelta
|
|
last: Union[datetime, None]
|
|
since_last: Union[timedelta, None]
|
|
unit: str
|
|
enabled: bool
|
|
active: bool
|
|
status: Union[str, None]
|
|
|
|
|
|
def get_last_trigger(timer_name: str):
|
|
output = subprocess.check_output("systemctl list-timers --all", shell=True).decode('utf-8')
|
|
lines = output.strip().split("\n")
|
|
for line in lines[1:]:
|
|
fields = line.split()
|
|
if timer_name in fields:
|
|
if len(fields) < 14:
|
|
# Timer has not been run yet.
|
|
return None
|
|
try:
|
|
return parse(fields[7] + ' ' + fields[8] + ' ' + fields[9] + ' ' + fields[10])
|
|
except IndexError:
|
|
print(fields)
|
|
raise
|
|
|
|
|
|
def get_next_elapse(timer_name: str) -> Tuple[TimerInfo | None, None | str]:
|
|
try:
|
|
output = subprocess.check_output(["systemctl", "status", timer_name], universal_newlines=True)
|
|
if timer_name in output.split('\n')[0]:
|
|
try:
|
|
parts = re.search(SYSTEMCTL_STATUS_RE, output)
|
|
|
|
next_trigger = parse(parts.group(3))
|
|
now = datetime.now(tz=next_trigger.tzinfo)
|
|
time_left = next_trigger - now
|
|
last_trigger = get_last_trigger(timer_name)
|
|
if last_trigger is not None:
|
|
since_last = last_trigger - now
|
|
else:
|
|
since_last = None
|
|
|
|
return TimerInfo(
|
|
next=next_trigger,
|
|
time_left=time_left,
|
|
last=last_trigger,
|
|
since_last=since_last,
|
|
enabled=parts.group(1).lower() == 'enabled',
|
|
active=parts.group(2).split(' ')[0].lower() == 'active',
|
|
status=parts.group(2).split(' ')[-1].lower().strip('(').strip(')') if '(' in parts.group(2).split(' ')[-1] else None,
|
|
unit=timer_name,
|
|
), None
|
|
except Exception:
|
|
print(output)
|
|
traceback.print_exc()
|
|
sys.exit(nagios.STATE_UNKNOWN)
|
|
return None, 'Timer not found'
|
|
except subprocess.CalledProcessError as e:
|
|
return None, f'systemctl status failed: {e}'
|
|
|
|
|
|
def check_timer(timer_name: str, expected_interval: int = None):
|
|
if not timer_name.endswith('.timer'):
|
|
timer_name = timer_name + '.timer'
|
|
|
|
try:
|
|
system_bus = dbus.SystemBus()
|
|
systemd1 = system_bus.get_object('org.freedesktop.systemd1', '/org/freedesktop/systemd1')
|
|
manager = dbus.Interface(systemd1, 'org.freedesktop.systemd1.Manager')
|
|
timer_unit_path = manager.GetUnit(timer_name)
|
|
timer_unit = system_bus.get_object('org.freedesktop.systemd1', timer_unit_path)
|
|
timer_properties = dbus.Interface(timer_unit, 'org.freedesktop.DBus.Properties')
|
|
active_state = timer_properties.Get('org.freedesktop.systemd1.Unit', 'ActiveState')
|
|
|
|
if active_state == 'active':
|
|
next_elapse, err = get_next_elapse(timer_name)
|
|
if err:
|
|
quit_check(f'{err}', nagios.STATE_UNKNOWN)
|
|
|
|
# if (next_elapse.left is not None and next_elapse.passed is not None) and (next_elapse.left < 0 or next_elapse.passed < 0):
|
|
# quit_check(f'Timer is negative??? Left: {next_elapse["left"]}. Passed: {next_elapse["passed"]}', nagios.STATE_UNKNOWN)
|
|
|
|
next_elapse_human = next_elapse.next.replace(tzinfo=tz.tzlocal()).strftime('%a %Y-%m-%d %H:%M %Z')
|
|
remaining_time_human = humanfriendly.format_timespan(next_elapse.time_left)
|
|
since_last_human = humanfriendly.format_timespan(next_elapse.since_last) if next_elapse.since_last else 'N/A'
|
|
|
|
perfdata_dict = {
|
|
'remaining_time': {
|
|
'value': next_elapse.time_left.seconds,
|
|
'unit': 's',
|
|
'min': 0
|
|
},
|
|
'time_since_last': {
|
|
'value': next_elapse.since_last.seconds if next_elapse.since_last is not None else 0,
|
|
'unit': 's',
|
|
'min': 0
|
|
}
|
|
}
|
|
|
|
timer_info = f'Next trigger time: {next_elapse_human}. Time until next trigger: {remaining_time_human}. Time since last trigger: {since_last_human}.'
|
|
|
|
if expected_interval is not None and next_elapse.next and next_elapse.last:
|
|
next_trigger_time = next_elapse.next
|
|
actual_interval = next_trigger_time - next_elapse.last
|
|
actual_interval_seconds = actual_interval.total_seconds()
|
|
|
|
if actual_interval_seconds > expected_interval:
|
|
quit_check(f'{timer_name} is active but the last trigger was more than the expected interval ago. {timer_info}', nagios.STATE_CRIT, perfdata_dict)
|
|
|
|
quit_check(f'{timer_name}{" is active" + (" (" + next_elapse.status + ")" if next_elapse else "") + "." if next_elapse.active else " -> "} {timer_info}', nagios.STATE_OK, perfdata_dict)
|
|
else:
|
|
quit_check(f'{timer_name} is not enabled', nagios.STATE_CRIT)
|
|
except dbus.exceptions.DBusException:
|
|
quit_check(f'{timer_name} does not exist or is disabled', nagios.STATE_CRIT)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('-t', '--timer', required=True, help='The name of the timer to check.')
|
|
parser.add_argument('-i', '--interval', type=int, help='The expected interval between timer triggers in seconds.')
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
check_timer(args.timer, args.interval)
|
|
except Exception as e:
|
|
print(f'UNKNOWN - exception "{e}"')
|
|
traceback.print_exc()
|
|
sys.exit(nagios.STATE_UNKNOWN)
|