diff --git a/check_openwrt_bssid.py b/check_openwrt_bssid.py index f803b88..dcc864f 100755 --- a/check_openwrt_bssid.py +++ b/check_openwrt_bssid.py @@ -3,6 +3,7 @@ import argparse import json import re import sys +import time import traceback import paramiko @@ -11,90 +12,124 @@ from checker import nagios from checker.result import quit_check from checker.types import try_int, try_float +WIFI_2 = list(range(1, 12)) +WIFI_5 = list(range(32, 178)) + def main(args): - command = 'iwinfo scan0 scan' + command = f'iwinfo {args.interface} scan' ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - try: - ssh.connect(args.host, username='root') - except paramiko.ssh_exception.AuthenticationException: - print(f'SSH authentication to {args.host} failed! Make sure your key exists on the remote system or copy it over with ssh-copy-id') - sys.exit(1) + for _ in range(args.retries + 1): + try: + ssh.connect(args.host, username='root') + except paramiko.ssh_exception.AuthenticationException: + quit_check(f'SSH authentication to root@{args.host} failed! Make sure your key exists on the remote system or copy it over with ssh-copy-id', nagios.STATE_UNKNOWN) + sys.exit(nagios.STATE_UNKNOWN) + except paramiko.ssh_exception.NoValidConnectionsError: + quit_check(f'SSH connection failed to {args.host}', nagios.STATE_UNKNOWN) - stdin, stdout, stderr = ssh.exec_command(command) - output = [str(x).strip('\n').strip(' ') for x in stdout.readlines()] + stdin, stdout, stderr = ssh.exec_command(command) + if stdout.channel.recv_exit_status() != 0: + quit_check(f'command failed:\nSTDOUT: {"".join(stdout.readlines())}\nSTDERR: {"".join(stderr.readlines())}', nagios.STATE_UNKNOWN) + output = [str(x).strip('\n').strip(' ') for x in stdout.readlines()] - cells = {} + cells = {} - for line in output: - if line.startswith('Cell '): - address = re.search(r'Address:\s(.*?)$', line).group(1) - cells[address] = {} - elif line.startswith('ESSID: '): - if line == 'ESSID: unknown': - ssid = None - else: - ssid = re.search(r'ESSID:\s"(.*?)"$', line).group(1) - cells[list(cells.keys())[-1]]['ssid'] = ssid - elif line.startswith('Mode: '): - m = re.search(r'Mode: .*?\s\sFrequency: ([0-9.]*) GHz\s\sBand: ([0-9.]*) GHz\s\sChannel: ([0-9]*)$', line) - cells[list(cells.keys())[-1]]['freq'] = try_float(m.group(1)) - cells[list(cells.keys())[-1]]['band'] = try_float(m.group(2)) - cells[list(cells.keys())[-1]]['chan'] = try_int(m.group(3)) - elif line.startswith('Encryption: '): - enc = re.search(r'Encryption: (.*?)$', line).group(1) - cells[list(cells.keys())[-1]]['enc'] = enc - elif line.startswith('Channel Width: '): - width = re.search(r'Channel Width: (.*?)$', line).group(1) - cells[list(cells.keys())[-1]]['chan_width'] = width - elif line.startswith('Signal: '): - m = re.search(r'Signal: (-[0-9]*) dBm\s\sQuality: ([0-9]*)/([0-9]*)$', line) - cells[list(cells.keys())[-1]]['signal'] = try_int(m.group(1)) - cells[list(cells.keys())[-1]]['quality'] = (try_int(m.group(2)), try_int(m.group(3))) + for line in output: + if line.startswith('Cell '): + address = re.search(r'Address:\s(.*?)$', line).group(1) + cells[address] = {} + elif line.startswith('ESSID: '): + if line == 'ESSID: unknown': + ssid = None + else: + ssid = re.search(r'ESSID:\s"(.*?)"$', line).group(1) + cells[list(cells.keys())[-1]]['ssid'] = ssid + elif line.startswith('Mode: '): + m = re.search(r'Mode: .*?\s\sFrequency: ([0-9.]*) GHz\s\sBand: ([0-9.]*) GHz\s\sChannel: ([0-9]*)$', line) + if not m: + # Older OpenWRT versions display this info differently. + m = re.search(r'Mode: .*?\s\sChannel: ([0-9]*)$', line) + chan = try_int(m.group(1)) + if chan in WIFI_2: + band = 2.4 + elif chan in WIFI_5: + band = 5 + else: + raise Exception + freq = -1 + else: + freq = try_float(m.group(1)) + band = try_float(m.group(2)) + chan = try_int(m.group(3)) + cells[list(cells.keys())[-1]]['freq'] = freq + cells[list(cells.keys())[-1]]['band'] = band + cells[list(cells.keys())[-1]]['chan'] = chan + elif line.startswith('Encryption: '): + enc = re.search(r'Encryption: (.*?)$', line).group(1) + cells[list(cells.keys())[-1]]['enc'] = enc + elif line.startswith('Channel Width: '): + width = re.search(r'Channel Width: (.*?)$', line).group(1) + cells[list(cells.keys())[-1]]['chan_width'] = width + elif line.startswith('Signal: '): + m = re.search(r'Signal: (-[0-9]*) dBm\s\sQuality: ([0-9]*)/([0-9]*)$', line) + cells[list(cells.keys())[-1]]['signal'] = try_int(m.group(1)) + cells[list(cells.keys())[-1]]['quality'] = (try_int(m.group(2)), try_int(m.group(3))) - if args.print: - print(json.dumps(cells)) - sys.exit(nagios.STATE_UNKNOWN) + if args.print: + print(json.dumps(cells)) + sys.exit(nagios.STATE_UNKNOWN) - if args.target_mac not in list(cells.keys()): - quit_check(f'AP address not found: {args.target_mac}', nagios.STATE_CRIT) + if args.target_mac not in list(cells.keys()): + time.sleep(10) + continue - ap_data = cells[args.target_mac] - computed_quality = int(try_float(ap_data['quality'][0]) / try_float(ap_data['quality'][1]) * 100) - perfdata = { - 'signal': {'value': try_int(ap_data['signal']), 'warn': args.signal_warn, 'crit': args.signal_crit}, - 'quality': {'value': int(computed_quality), 'warn': args.quality_warn, 'crit': args.quality_crit, 'unit': '%'}, - 'freq': {'value': ap_data['freq']}, - 'band': {'value': ap_data['band']}, - 'chan': {'value': ap_data['chan']}, - 'chan_width': {'value': ap_data['chan_width']}, - } + ap_data = cells[args.target_mac] + computed_quality = int(try_float(ap_data['quality'][0]) / try_float(ap_data['quality'][1]) * 100) - if ap_data['ssid'] != args.expected_ssid: - quit_check(f'AP does not have the right SSID.\n{json.dumps(ap_data)}', nagios.STATE_CRIT, perfdata=perfdata) + chan_width = ap_data['chan_width'].strip(' MHz') + if re.search(r'[a-zA-Z]', chan_width): + # OpenWRT may be unsure about the channel width. + chan_width = -1 - if ap_data['signal'] <= args.signal_crit: - quit_check(f'Signal strength is weak: {ap_data["signal"]} dBm', nagios.STATE_CRIT, perfdata=perfdata) + perfdata = { + 'signal': {'value': try_int(ap_data['signal']), 'warn': args.signal_warn, 'crit': args.signal_crit}, + 'quality': {'value': int(computed_quality), 'warn': args.quality_warn, 'crit': args.quality_crit, 'unit': '%'}, + 'freq': {'value': ap_data['freq']}, + 'band': {'value': ap_data['band']}, + 'chan': {'value': ap_data['chan']}, + 'chan_width': {'value': chan_width}, + } - if ap_data['signal'] <= args.signal_warn: - quit_check(f'Signal strength is weak: {ap_data["signal"]} dBm', nagios.STATE_WARN, perfdata=perfdata) + if ap_data['ssid'] != args.expected_ssid: + quit_check(f'AP does not have the right SSID.\n{json.dumps(ap_data)}', nagios.STATE_CRIT, perfdata=perfdata) - if computed_quality <= args.quality_crit: - quit_check(f'Signal quality is low: {ap_data["quality"][0]}/{ap_data["quality"][1]} ({computed_quality}%)', nagios.STATE_CRIT, perfdata=perfdata) + if ap_data['signal'] <= args.signal_crit: + quit_check(f'Signal strength is weak: {ap_data["signal"]} dBm', nagios.STATE_CRIT, perfdata=perfdata) - if computed_quality <= args.quality_warn: - quit_check(f'Signal quality is low: {ap_data["quality"][0]}/{ap_data["quality"][1]} ({computed_quality}%)', nagios.STATE_WARN, perfdata=perfdata) + if ap_data['signal'] <= args.signal_warn: + quit_check(f'Signal strength is weak: {ap_data["signal"]} dBm', nagios.STATE_WARN, perfdata=perfdata) - quit_check(f"{ap_data['ssid']} ({args.target_mac}) is healthy. Signal: {ap_data['signal']} dBm. Quality: {computed_quality}%. Frequency: {ap_data['freq']} GHz. Band: {ap_data['band']} GHz. Channel: {ap_data['chan']} @ {ap_data['chan_width']} width. Encryption: {ap_data['enc']}.", - nagios.STATE_OK, - perfdata=perfdata) + if computed_quality <= args.quality_crit: + quit_check(f'Signal quality is low: {ap_data["quality"][0]}/{ap_data["quality"][1]} ({computed_quality}%)', nagios.STATE_CRIT, perfdata=perfdata) + + if computed_quality <= args.quality_warn: + quit_check(f'Signal quality is low: {ap_data["quality"][0]}/{ap_data["quality"][1]} ({computed_quality}%)', nagios.STATE_WARN, perfdata=perfdata) + + quit_check(f"{ap_data['ssid']} ({args.target_mac}) is healthy. Signal: {ap_data['signal']} dBm. Quality: {computed_quality}%. Frequency: {ap_data['freq']} GHz. Band: {ap_data['band']} GHz. Channel: {ap_data['chan']} @ {ap_data['chan_width']} width. Encryption: {ap_data['enc']}.", + nagios.STATE_OK, + perfdata=perfdata) + + quit_check(f'AP address not found: {args.target_mac}', nagios.STATE_CRIT) if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--host', required=True, help='The host to SSH into.') + parser.add_argument('--interface', required=True, help='The wireless interface to use (example: `wlan0`).') + parser.add_argument('--retries', default=3, help='If the target AP is not found, do this many retries.') parser.add_argument('--print', action='store_true', help='Print the found APs.') parser.add_argument('--target-mac', required=True, help='The MAC address of the target AP.') parser.add_argument('--expected-ssid', required=True, help="The AP's expected SSID.")