#!/bin/python3 # HTTP, DNS, and IP monitoring script from collections import namedtuple import time import logging import datetime import socket import json import os from typing import Callable, List import requests import pydig import git import pytz REPO_ROOT = os.getenv("STATUS_REPO", "status-repo") class Formatter(logging.Formatter): COLOR_RST = "\033[0m" COLORS = { "reset": "\033[0m", "cyan": "\033[36m", "red": "\033[31m", "boldred": "\033[1;31m", "green": "\033[32m", "blue": "\033[34m", "yellow": "\033[33m", } LOGGING_COLORS = { logging.DEBUG: "blue", logging.INFO: "green", logging.WARNING: "yellow", logging.WARN: "yellow", logging.ERROR: "red", logging.CRITICAL: "boldred", } def __init__(self, exclude_time_for: int = 1, disable_colors: bool = False) -> None: """ Fancy formatter Args: exclude_time_for (int): number of seconds that must have passed for another timestamp to be shown max_width (int): max log width, defaults to 80 characters """ super().__init__() self.last_timestamp = 0 self.exclude_time_for = exclude_time_for self.disable_colors = disable_colors def c(self, color: str) -> str: if self.disable_colors is True: return "" else: return self.COLORS[color] def format(self, record: logging.LogRecord) -> str: output = "" if self.last_timestamp + self.exclude_time_for < record.created: dt = datetime.datetime.fromtimestamp(record.created) output += ( self.c("cyan") + dt.strftime("[%d/%m %H:%M:%S]") + self.COLOR_RST + " " ) self.last_timestamp = record.created else: output += " " * 17 output += self.c(self.LOGGING_COLORS.get(record.levelno, "reset")) output += f"{record.levelname.upper()[:3]}{self.COLOR_RST} " output += record.msg % record.args return output logger = logging.getLogger(__name__) # last states of services to keep from detecting downtime repeatedly last_states: dict[str, bool] = {} RequirementCheck = Callable[..., bool] MonitorDict = dict[str, dict[RequirementCheck, dict]] Fail = namedtuple("Fail", ("service_name", "failed_requirements")) # publish a failed service, no dependents so edit at will def fail(failed: List[Fail]): repo = git.Repo(REPO_ROOT) # type: ignore origin = repo.remote("origin") try: origin.pull(kill_after_timeout=10) except git.CommandError: logger.error("failed to pull from origin") return for service_name, failed_requirements in failed: if not last_states.get(service_name, True): continue # we've already seen the service down now = datetime.datetime.now(tz=pytz.timezone("Europe/Prague")) filename = f"src/content/{now.strftime('%Y-%m-%d-%f')}-downtime.md" with open(REPO_ROOT + "/" + filename, "w+") as f: lines = [ "---\n", f"title: {service_name} downtime\n", f"date: {now.strftime('%Y-%m-%d %H:%M:%S %z')}\n", "severity: down\n", "affected:\n", f" - {service_name}\n", "---\n", f"Automatic checks for {service_name} have failed. " f"Requirements {[r.__name__ for r in failed_requirements]} failed.\n", ] f.writelines(lines) repo.git.add(filename) repo.git.commit("-m", f"{service_name} downtime") try: origin.push(kill_after_timeout=10) except git.CommandError: logger.error("failed to push to origin, resetting working tree") repo.git.reset("origin/HEAD", working_tree=True) logger.info("failed services published") def self_check() -> bool: try: if requests.get("https://google.com/").status_code != 200: return False except requests.exceptions.ConnectionError: return False return True def retry(n: int = 3, sleep: int = 5) -> Callable[[RequirementCheck], RequirementCheck]: """Decorator maker for calling a function multiple times with sleep time between calls.""" def inner_retry(func: RequirementCheck) -> RequirementCheck: def inner(*args, **kwargs) -> bool: passed = False for _ in range(n - 1): passed = func(*args, **kwargs) if passed: break time.sleep(sleep) return passed # preserve names in log (instead of each requirement being called "inner") inner.__name__ = func.__name__ return inner return inner_retry @retry() def http_requirement(url: str, code: int) -> bool: try: resp = requests.head(url, headers={"User-agent": "monitoring (v1)"}) except requests.exceptions.ConnectionError: return False else: return resp.status_code == code def dns_requirement(name: str, ip: str) -> bool: try: query = pydig.query(name, "A") except ConnectionError: return False return query is not None and (ip == "*" or ip in query) @retry() def ip_requirement(ip: str, port: int, prot: str) -> bool: protocol = socket.SOCK_STREAM if prot == "tcp" else socket.SOCK_DGRAM sock = socket.socket(type=protocol) try: sock.connect((ip, port)) except ConnectionError: return False sock.close() return True def check(monitors: MonitorDict): failed_services: List[Fail] = [] for service, requirements in monitors.items(): logger.info(f"checking service {service}") failed = [] for requirement, args in requirements.items(): passed = requirement(**args) if not passed: if not self_check(): logger.error( "self-check failed, assuming bad connection and aborting" ) return logger.warning(f" {requirement.__name__}({args})") failed.append(requirement) time.sleep(1) if failed: failed_services.append(Fail(service, failed)) if failed_services: fail(failed_services) # update last_states for service in monitors.keys(): last_states[service] = True for fs in failed_services: last_states[fs.service_name] = False logger.debug("check complete") monitors_: MonitorDict = { "f.bain.cz": { http_requirement: {"url": "https://f.bain.cz/status", "code": 200}, # dns_requirement: {"name": "f.bain.cz", "ip": "*"}, # ip_requirement: {"ip": "f.bain.cz", "port": 80, "prot": "tcp"} }, "s.bain.cz": { http_requirement: {"url": "https://s.bain.cz/", "code": 200}, }, "git.bain.cz": { http_requirement: {"url": "https://git.bain.cz/", "code": 200}, }, "ts3.bain.cz": {ip_requirement: {"ip": "ts3.bain.cz", "port": 9987, "prot": "udp"}}, } if __name__ == "__main__": handler = logging.StreamHandler() handler.setFormatter(Formatter()) logging.basicConfig(level=logging.INFO, handlers=[handler]) # we assume this is going to be run in a cron job as the gitpython # library is slowly leaking memory apparently if os.path.exists("last-state"): with open("last-state", "r") as f: last_states = json.load(f) check(monitors_) with open("last-state", "w+") as f: json.dump(last_states, f)