monitor/monitor.py

243 lines
7.5 KiB
Python

#!/bin/python3
# HTTP, DNS, and IP monitoring script
from collections import namedtuple
import time
import logging
import datetime
import socket
import json
import os
from typing import Callable, List
import requests
import pydig
import git
import pytz
REPO_ROOT = os.getenv("STATUS_REPO", "status-repo")
class Formatter(logging.Formatter):
COLOR_RST = "\033[0m"
COLORS = {
"reset": "\033[0m",
"cyan": "\033[36m",
"red": "\033[31m",
"boldred": "\033[1;31m",
"green": "\033[32m",
"blue": "\033[34m",
"yellow": "\033[33m",
}
LOGGING_COLORS = {
logging.DEBUG: "blue",
logging.INFO: "green",
logging.WARNING: "yellow",
logging.WARN: "yellow",
logging.ERROR: "red",
logging.CRITICAL: "boldred",
}
def __init__(self, exclude_time_for: int = 1, disable_colors: bool = False) -> None:
"""
Fancy formatter
Args:
exclude_time_for (int): number of seconds that must have passed
for another timestamp to be shown
max_width (int): max log width, defaults to 80 characters
"""
super().__init__()
self.last_timestamp = 0
self.exclude_time_for = exclude_time_for
self.disable_colors = disable_colors
def c(self, color: str) -> str:
if self.disable_colors is True:
return ""
else:
return self.COLORS[color]
def format(self, record: logging.LogRecord) -> str:
output = ""
if self.last_timestamp + self.exclude_time_for < record.created:
dt = datetime.datetime.fromtimestamp(record.created)
output += (
self.c("cyan") + dt.strftime("[%d/%m %H:%M:%S]") + self.COLOR_RST + " "
)
self.last_timestamp = record.created
else:
output += " " * 17
output += self.c(self.LOGGING_COLORS.get(record.levelno, "reset"))
output += f"{record.levelname.upper()[:3]}{self.COLOR_RST} "
output += record.msg % record.args
return output
logger = logging.getLogger(__name__)
# last states of services to keep from detecting downtime repeatedly
last_states: dict[str, bool] = {}
RequirementCheck = Callable[..., bool]
MonitorDict = dict[str, dict[RequirementCheck, dict]]
Fail = namedtuple("Fail", ("service_name", "failed_requirements"))
# publish a failed service, no dependents so edit at will
def fail(failed: List[Fail]):
repo = git.Repo(REPO_ROOT) # type: ignore
origin = repo.remote("origin")
try:
origin.pull(kill_after_timeout=10)
except git.CommandError:
logger.error("failed to pull from origin")
return
for service_name, failed_requirements in failed:
if not last_states.get(service_name, True):
continue # we've already seen the service down
now = datetime.datetime.now(tz=pytz.timezone("Europe/Prague"))
filename = f"src/content/{now.strftime('%Y-%m-%d-%f')}-downtime.md"
with open(REPO_ROOT + "/" + filename, "w+") as f:
lines = [
"---\n",
f"title: {service_name} downtime\n",
f"date: {now.strftime('%Y-%m-%d %H:%M:%S %z')}\n",
"severity: down\n",
"affected:\n",
f" - {service_name}\n",
"---\n",
f"Automatic checks for {service_name} have failed. "
f"Requirements {[r.__name__ for r in failed_requirements]} failed.\n",
]
f.writelines(lines)
repo.git.add(filename)
repo.git.commit("-m", f"{service_name} downtime")
try:
origin.push(kill_after_timeout=10)
except git.CommandError:
logger.error("failed to push to origin, resetting working tree")
repo.git.reset("origin/HEAD", working_tree=True)
logger.info("failed services published")
def self_check() -> bool:
try:
if requests.get("https://google.com/").status_code != 200:
return False
except requests.exceptions.ConnectionError:
return False
return True
def retry(n: int = 3, sleep: int = 5) -> Callable[[RequirementCheck], RequirementCheck]:
"""Decorator maker for calling a function multiple times with sleep time between calls."""
def inner_retry(func: RequirementCheck) -> RequirementCheck:
def inner(*args, **kwargs) -> bool:
passed = False
for _ in range(n - 1):
passed = func(*args, **kwargs)
if passed:
break
time.sleep(sleep)
return passed
# preserve names in log (instead of each requirement being called "inner")
inner.__name__ = func.__name__
return inner
return inner_retry
@retry()
def http_requirement(url: str, code: int) -> bool:
try:
resp = requests.head(url, headers={"User-agent": "monitoring (v1)"})
except requests.exceptions.ConnectionError:
return False
else:
return resp.status_code == code
def dns_requirement(name: str, ip: str) -> bool:
try:
query = pydig.query(name, "A")
except ConnectionError:
return False
return query is not None and (ip == "*" or ip in query)
@retry()
def ip_requirement(ip: str, port: int, prot: str) -> bool:
protocol = socket.SOCK_STREAM if prot == "tcp" else socket.SOCK_DGRAM
sock = socket.socket(type=protocol)
try:
sock.connect((ip, port))
except ConnectionError:
return False
sock.close()
return True
def check(monitors: MonitorDict):
failed_services: List[Fail] = []
for service, requirements in monitors.items():
logger.info(f"checking service {service}")
failed = []
for requirement, args in requirements.items():
passed = requirement(**args)
if not passed:
if not self_check():
logger.error(
"self-check failed, assuming bad connection and aborting"
)
return
logger.warning(f" {requirement.__name__}({args})")
failed.append(requirement)
time.sleep(1)
if failed:
failed_services.append(Fail(service, failed))
if failed_services:
fail(failed_services)
# update last_states
for service in monitors.keys():
last_states[service] = True
for fs in failed_services:
last_states[fs.service_name] = False
logger.debug("check complete")
monitors_: MonitorDict = {
"f.bain.cz": {
http_requirement: {"url": "https://f.bain.cz/status", "code": 200},
# dns_requirement: {"name": "f.bain.cz", "ip": "*"},
# ip_requirement: {"ip": "f.bain.cz", "port": 80, "prot": "tcp"}
},
"s.bain.cz": {
http_requirement: {"url": "https://s.bain.cz/", "code": 200},
},
"git.bain.cz": {
http_requirement: {"url": "https://git.bain.cz/", "code": 200},
},
"ts3.bain.cz": {ip_requirement: {"ip": "ts3.bain.cz", "port": 9987, "prot": "udp"}},
}
if __name__ == "__main__":
handler = logging.StreamHandler()
handler.setFormatter(Formatter())
logging.basicConfig(level=logging.INFO, handlers=[handler])
# we assume this is going to be run in a cron job as the gitpython
# library is slowly leaking memory apparently
if os.path.exists("last-state"):
with open("last-state", "r") as f:
last_states = json.load(f)
check(monitors_)
with open("last-state", "w+") as f:
json.dump(last_states, f)