import typing as t from collections import defaultdict from flask import Flask, request from . import signal from . import configuration app = Flask(__name__) @app.route("/") def root() -> t.Tuple[str, int]: return "Not supported.", 400 @app.route("/alertmanager", methods=["POST"]) def alertmanager(): data = request.get_json(cache=False) if "version" not in data or int(data["version"]) != 4: return "Bad API version", 400 if "alerts" not in data: return "No alerts", 400 if "status" not in data: return "No status", 400 if data["status"] != "firing": # Ignore return "OK", 200 severities = defaultdict(int) alert_lines = [] for alert in data["alerts"]: try: severities[alert["labels"]["severity"]] += 1 alert_lines.append( f"<{alert['labels']['alertname']}> {alert['annotations']['description']}" ) except KeyError as exn: alert_lines.append(f"(malformed alert — {exn})") msg = f"[PROMETHEUS] {len(data['alerts'])} firing: " msg += ", ".join([f"{count} {typ}" for typ, count in severities.items()]) msg += "\n\n" msg += "\n".join(["* " + line for line in alert_lines]) signal.signal_send(configuration.RECIPIENTS[configuration.DEFAULT_RECIPIENT], msg) return "OK", 200